From b2e7620846d03d7892462c9417ec35f5bf8d4e0a Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Thu, 29 Sep 2022 16:28:57 +0800 Subject: [PATCH 01/12] refactor code --- .../DeltaMerge/DMSegmentThreadInputStream.h | 36 +++-- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 14 +- dbms/src/Storages/DeltaMerge/Segment.cpp | 138 +++++++++++------- dbms/src/Storages/DeltaMerge/Segment.h | 11 +- .../DeltaMerge/SegmentReadTaskPool.cpp | 35 ++++- .../Storages/DeltaMerge/SegmentReadTaskPool.h | 16 +- 6 files changed, 161 insertions(+), 89 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h index 9eebb33c18f..bc38361c294 100644 --- a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h @@ -47,8 +47,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream const RSOperatorPtr & filter_, UInt64 max_version_, size_t expected_block_size_, - bool is_raw_, - bool do_delete_mark_filter_for_raw_, + ReadMode read_mode_, const int extra_table_id_index, const TableID physical_table_id, const String & req_id) @@ -60,8 +59,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream , header(toEmptyBlock(columns_to_read)) , max_version(max_version_) , expected_block_size(expected_block_size_) - , is_raw(is_raw_) - , do_delete_mark_filter_for_raw(do_delete_mark_filter_for_raw_) + , read_mode(read_mode_) , extra_table_id_index(extra_table_id_index) , physical_table_id(physical_table_id) , log(Logger::get(NAME, req_id)) @@ -102,26 +100,35 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream } cur_segment = task->segment; - if (is_raw) + auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); + switch (read_mode) { - cur_stream = cur_segment->getInputStreamRaw( + case Normal: + cur_stream = cur_segment->getInputStream( *dm_context, columns_to_read, task->read_snapshot, task->ranges, filter, - do_delete_mark_filter_for_raw); - } - else - { - cur_stream = cur_segment->getInputStream( + max_version, + block_size); + break; + case Fast: + cur_stream = cur_segment->getInputStreamFast( *dm_context, columns_to_read, task->read_snapshot, task->ranges, filter, - max_version, - std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows))); + block_size); + break; + case Raw: + cur_stream = cur_segment->getInputStreamRaw( + *dm_context, + columns_to_read, + task->read_snapshot, + task->ranges); + break; } LOG_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo()); } @@ -172,8 +179,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream Block header; const UInt64 max_version; const size_t expected_block_size; - const bool is_raw; - const bool do_delete_mark_filter_for_raw; + const ReadMode read_mode; // position of the ExtraPhysTblID column in column_names parameter in the StorageDeltaMerge::read function. const int extra_table_id_index; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 588bcd4a55b..1d09ba2d35b 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -845,7 +845,7 @@ void DeltaMergeStore::compact(const Context & db_context, const RowKeyRange & ra } } -// Read data without mvcc filtering && delete mark != 0 filtering. +// Read data without mvcc filtering. // just for debug // readRaw is called under 'selraw xxxx' BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, @@ -902,8 +902,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, EMPTY_FILTER, std::numeric_limits::max(), DEFAULT_BLOCK_SIZE, - /* is_raw = */ true, - /* do_delete_mark_filter_for_raw = */ false, + /* read_mode */ Raw, std::move(tasks), after_segment_read, req_info); @@ -931,8 +930,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, EMPTY_FILTER, std::numeric_limits::max(), DEFAULT_BLOCK_SIZE, - /* is_raw_ */ true, - /* do_delete_mark_filter_for_raw_ */ false, // don't do filter based on del_mark = 1 + /* read_mode */ Raw, extra_table_id_index, physical_table_id, req_info); @@ -986,8 +984,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, filter, max_version, expected_block_size, - /* is_raw = */ is_fast_scan, - /* do_delete_mark_filter_for_raw = */ is_fast_scan, + /* read_mode = */ is_fast_scan ? Fast : Normal, std::move(tasks), after_segment_read, log_tracing_id); @@ -1015,8 +1012,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, filter, max_version, expected_block_size, - /* is_raw_= */ is_fast_scan, - /* do_delete_mark_filter_for_raw_= */ is_fast_scan, + /* read_mode = */ is_fast_scan ? Fast : Normal, extra_table_id_index, physical_table_id, log_tracing_id); diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 6bd484e3a62..f57534411a9 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -45,6 +45,8 @@ #include #include +#include "Storages/DeltaMerge/Filter/RSOperator.h" + namespace ProfileEvents { extern const Event DMWriteBlock; @@ -507,64 +509,58 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_co return data_stream; } -/// we will call getInputStreamRaw in two condition: -/// 1. Using 'selraw xxxx' statement, which is always in test for debug. (when filter_delete_mark = false) -/// In this case, we will read all the data without mvcc filtering, -/// del_mark != 0 filtering and sorted merge. -/// We will just read all the data and return. -/// 2. We read in fast mode. (when filter_delete_mark = true) -/// In this case, we will read all the data without mvcc filtering and sorted merge, -/// but we will do del_mark != 0 filtering. -BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, - const ColumnDefines & columns_to_read, - const SegmentSnapshotPtr & segment_snap, - const RowKeyRanges & data_ranges, - const RSOperatorPtr & filter, - bool filter_delete_mark, - size_t expected_block_size) +/// We call getInputStreamFast when we read in fast mode. +/// In this case, we will read all the data in delta and stable, and then merge them without sorting. +/// Besides, we will do del_mark != 0 filtering to drop the deleted rows. +/// In conclusion, the output is unsorted, and does not do mvcc filtering. +BlockInputStreamPtr Segment::getInputStreamFast( + const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & data_ranges, + const RSOperatorPtr & filter, + size_t expected_block_size) { - /// Now, we use filter_delete_mark to determine whether it is in fast mode or just from `selraw * xxxx` - /// But this way seems not to be robustness enough, maybe we need another flag? auto new_columns_to_read = std::make_shared(); // new_columns_to_read need at most columns_to_read.size() + 2, due to may extra insert into the handle column and del_mark column. new_columns_to_read->reserve(columns_to_read.size() + 2); new_columns_to_read->push_back(getExtraHandleColumnDefine(is_common_handle)); - if (filter_delete_mark) - { - new_columns_to_read->push_back(getTagColumnDefine()); - } - - bool enable_handle_clean_read = filter_delete_mark; - bool enable_del_clean_read = filter_delete_mark; + new_columns_to_read->push_back(getTagColumnDefine()); + + /// When we read in fast mode, we can try to do the following optimization: + /// 1. Handle Column Optimization: + /// when the columns_to_read does not include HANDLE_COLUMN, + /// we can try to skip reading the handle column if the pack's handle range is fully within read range. + /// Thus, in this case, we set enable_handle_clean_read = true. + /// 2. Del Column Optimization: + /// when the columns_to_read does not include TAG_COLUMN, + /// we can try to skip reading the del column if the pack has no deleted rows. + /// Thus, in this case, we set enable_del_clean_read = true. + /// 3. Version Column Optimization: + /// if the columns_to_read does not include VERSION_COLUMN, + /// we don't need to read version column, thus we don't force push version column into new_columns_to_read. + + bool enable_handle_clean_read = true; + bool enable_del_clean_read = true; for (const auto & c : columns_to_read) { - if (c.id != EXTRA_HANDLE_COLUMN_ID) + if (c.id == EXTRA_HANDLE_COLUMN_ID) { - if (filter_delete_mark && c.id == TAG_COLUMN_ID) - { - enable_del_clean_read = false; - } - else - { - new_columns_to_read->push_back(c); - } + enable_handle_clean_read = false; + } + else if (c.id == TAG_COLUMN_ID) + { + enable_del_clean_read = false; } else { - enable_handle_clean_read = false; + new_columns_to_read->push_back(c); } } - /// when we read in fast mode, if columns_to_read does not include EXTRA_HANDLE_COLUMN_ID, - /// we can try to use clean read to make optimization in stable part. - /// when the pack is under totally data_ranges and has no rows whose del_mark = 1 --> we don't need read handle_column/tag_column/version_column - /// when the pack is under totally data_ranges and has rows whose del_mark = 1 --> we don't need read handle_column/version_column - /// others --> we don't need read version_column - /// Thus, in fast mode, if we don't need read handle_column, we set enable_handle_clean_read as true. - /// If we don't need read del_column, we set enable_del_clean_read as true BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream( dm_context, *new_columns_to_read, @@ -573,7 +569,7 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, std::numeric_limits::max(), expected_block_size, /* enable_handle_clean_read */ enable_handle_clean_read, - /* is_fast_scan */ filter_delete_mark, + /* is_fast_scan */ true, /* enable_del_clean_read */ enable_del_clean_read); BlockInputStreamPtr delta_stream = std::make_shared(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range); @@ -581,17 +577,61 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, delta_stream = std::make_shared>(delta_stream, data_ranges, 0); stable_stream = std::make_shared>(stable_stream, data_ranges, 0); - if (filter_delete_mark) + delta_stream = std::make_shared(delta_stream, columns_to_read, dm_context.tracing_id); + stable_stream = std::make_shared(stable_stream, columns_to_read, dm_context.tracing_id); + + BlockInputStreams streams; + + if (dm_context.read_delta_only) { - delta_stream = std::make_shared(delta_stream, columns_to_read, dm_context.tracing_id); - stable_stream = std::make_shared(stable_stream, columns_to_read, dm_context.tracing_id); + streams.push_back(delta_stream); + } + else if (dm_context.read_stable_only) + { + streams.push_back(stable_stream); } else { - delta_stream = std::make_shared(delta_stream, columns_to_read); - stable_stream = std::make_shared(stable_stream, columns_to_read); + streams.push_back(delta_stream); + streams.push_back(stable_stream); + } + return std::make_shared(streams, dm_context.tracing_id); +} + +/// We call getInputStreamRaw in 'selraw xxxx' statement, which is always in test for debug. +/// In this case, we will read all the data without mvcc filtering and sorted merging. +BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & data_ranges, + size_t expected_block_size) +{ + auto new_columns_to_read = std::make_shared(); + + new_columns_to_read->push_back(getExtraHandleColumnDefine(is_common_handle)); + + for (const auto & c : columns_to_read) + { + if (c.id != EXTRA_HANDLE_COLUMN_ID) + new_columns_to_read->push_back(c); } + BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream( + dm_context, + *new_columns_to_read, + data_ranges, + EMPTY_FILTER, + std::numeric_limits::max(), + expected_block_size, + /* enable_handle_clean_read */ false); + + BlockInputStreamPtr delta_stream = std::make_shared(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range); + + delta_stream = std::make_shared>(delta_stream, data_ranges, 0); + stable_stream = std::make_shared>(stable_stream, data_ranges, 0); + + delta_stream = std::make_shared(delta_stream, columns_to_read); + stable_stream = std::make_shared(stable_stream, columns_to_read); BlockInputStreams streams; @@ -611,12 +651,12 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, return std::make_shared(streams, dm_context.tracing_id); } -BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read, bool filter_delete_mark) +BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read) { auto segment_snap = createSnapshot(dm_context, false, CurrentMetrics::DT_SnapshotOfReadRaw); if (!segment_snap) return {}; - return getInputStreamRaw(dm_context, columns_to_read, segment_snap, {rowkey_range}, EMPTY_FILTER, filter_delete_mark); + return getInputStreamRaw(dm_context, columns_to_read, segment_snap, {rowkey_range}); } SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr & schema_snap) const diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 1bda20c8bf4..9192b7802d5 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -179,19 +179,24 @@ class Segment : private boost::noncopyable size_t expected_block_size = DEFAULT_BLOCK_SIZE, bool reorganize_block = true) const; - BlockInputStreamPtr getInputStreamRaw( + BlockInputStreamPtr getInputStreamFast( const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & data_ranges, const RSOperatorPtr & filter, - bool filter_delete_mark = true, size_t expected_block_size = DEFAULT_BLOCK_SIZE); BlockInputStreamPtr getInputStreamRaw( const DMContext & dm_context, const ColumnDefines & columns_to_read, - bool filter_delete_mark = false); + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & data_ranges, + size_t expected_block_size = DEFAULT_BLOCK_SIZE); + + BlockInputStreamPtr getInputStreamRaw( + const DMContext & dm_context, + const ColumnDefines & columns_to_read); /// For those split, merge and mergeDelta methods, we should use prepareXXX/applyXXX combo in real production. /// split(), merge() and mergeDelta() are only used in test cases. diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index 9e8c91e8094..f4cc49cdcd0 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -100,14 +100,35 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t MemoryTrackerSetter setter(true, mem_tracker.get()); auto seg = t->segment; BlockInputStreamPtr stream; - if (is_raw) + auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); + switch (read_mode) { - stream = seg->getInputStreamRaw(*dm_context, columns_to_read, t->read_snapshot, t->ranges, filter, do_range_filter_for_raw); - } - else - { - auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); - stream = seg->getInputStream(*dm_context, columns_to_read, t->read_snapshot, t->ranges, filter, max_version, block_size); + case Normal: + stream = seg->getInputStream( + *dm_context, + columns_to_read, + t->read_snapshot, + t->ranges, + filter, + max_version, + block_size); + break; + case Fast: + stream = seg->getInputStreamFast( + *dm_context, + columns_to_read, + t->read_snapshot, + t->ranges, + filter, + block_size); + break; + case Raw: + stream = seg->getInputStreamRaw( + *dm_context, + columns_to_read, + t->read_snapshot, + t->ranges); + break; } LOG_FMT_DEBUG(log, "getInputStream succ, pool_id={} segment_id={}", pool_id, seg->segmentId()); return stream; diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index 5211d12a334..c34ea6d24a6 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -111,6 +111,13 @@ class BlockStat std::atomic total_bytes; }; +enum ReadMode +{ + Normal = 0, + Fast, + Raw, +}; + class SegmentReadTaskPool : private boost::noncopyable { public: @@ -121,8 +128,7 @@ class SegmentReadTaskPool : private boost::noncopyable const RSOperatorPtr & filter_, uint64_t max_version_, size_t expected_block_size_, - bool is_raw_, - bool do_range_filter_for_raw_, + ReadMode read_mode_, SegmentReadTasks && tasks_, AfterSegmentRead after_segment_read_, const String & tracing_id) @@ -133,8 +139,7 @@ class SegmentReadTaskPool : private boost::noncopyable , filter(filter_) , max_version(max_version_) , expected_block_size(expected_block_size_) - , is_raw(is_raw_) - , do_range_filter_for_raw(do_range_filter_for_raw_) + , read_mode(read_mode_) , tasks(std::move(tasks_)) , after_segment_read(after_segment_read_) , log(Logger::get("SegmentReadTaskPool", tracing_id)) @@ -214,8 +219,7 @@ class SegmentReadTaskPool : private boost::noncopyable RSOperatorPtr filter; const uint64_t max_version; const size_t expected_block_size; - const bool is_raw; - const bool do_range_filter_for_raw; + const ReadMode read_mode; SegmentReadTasks tasks; AfterSegmentRead after_segment_read; std::mutex mutex; From b0988f8eae6502c2395addde5ff314929fa1aa1c Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Thu, 29 Sep 2022 16:32:05 +0800 Subject: [PATCH 02/12] update code --- dbms/src/Storages/DeltaMerge/Segment.cpp | 2 +- dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index f57534411a9..d1de9cc483c 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -599,7 +599,7 @@ BlockInputStreamPtr Segment::getInputStreamFast( } /// We call getInputStreamRaw in 'selraw xxxx' statement, which is always in test for debug. -/// In this case, we will read all the data without mvcc filtering and sorted merging. +/// In this case, we will read all the data without mvcc filtering and sorted merge. BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index c34ea6d24a6..9ecea73e2a6 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -113,9 +113,9 @@ class BlockStat enum ReadMode { - Normal = 0, - Fast, - Raw, + Normal = 0, // Read in Normal Mode. + Fast, // Read in Fast Mode, read data without mvcc and sorted merge, but filter the data whose del_mark = 1. + Raw, // Just for 'selraw * ', always using in debug code. }; class SegmentReadTaskPool : private boost::noncopyable From 2be41bbf723f8cd3ea3407dbcd152c736c559c6c Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Thu, 29 Sep 2022 17:29:19 +0800 Subject: [PATCH 03/12] remove useless code --- dbms/src/Storages/DeltaMerge/Segment.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index d1de9cc483c..e95df5a18d6 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -45,8 +45,6 @@ #include #include -#include "Storages/DeltaMerge/Filter/RSOperator.h" - namespace ProfileEvents { extern const Event DMWriteBlock; From 51ad9a3a9642344e2d317a43e96a703d5765e224 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Thu, 29 Sep 2022 18:28:04 +0800 Subject: [PATCH 04/12] update enum class --- dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h | 6 +++--- dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp | 8 ++++---- dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp | 6 +++--- dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h index bc38361c294..6bde0c25074 100644 --- a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h @@ -103,7 +103,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); switch (read_mode) { - case Normal: + case ReadMode::Normal: cur_stream = cur_segment->getInputStream( *dm_context, columns_to_read, @@ -113,7 +113,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream max_version, block_size); break; - case Fast: + case ReadMode::Fast: cur_stream = cur_segment->getInputStreamFast( *dm_context, columns_to_read, @@ -122,7 +122,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream filter, block_size); break; - case Raw: + case ReadMode::Raw: cur_stream = cur_segment->getInputStreamRaw( *dm_context, columns_to_read, diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 1d09ba2d35b..6b22f690a82 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -902,7 +902,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, EMPTY_FILTER, std::numeric_limits::max(), DEFAULT_BLOCK_SIZE, - /* read_mode */ Raw, + /* read_mode */ ReadMode::Raw, std::move(tasks), after_segment_read, req_info); @@ -930,7 +930,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, EMPTY_FILTER, std::numeric_limits::max(), DEFAULT_BLOCK_SIZE, - /* read_mode */ Raw, + /* read_mode */ ReadMode::Raw, extra_table_id_index, physical_table_id, req_info); @@ -984,7 +984,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, filter, max_version, expected_block_size, - /* read_mode = */ is_fast_scan ? Fast : Normal, + /* read_mode = */ is_fast_scan ? ReadMode::Fast : ReadMode::Normal, std::move(tasks), after_segment_read, log_tracing_id); @@ -1012,7 +1012,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, filter, max_version, expected_block_size, - /* read_mode = */ is_fast_scan ? Fast : Normal, + /* read_mode = */ is_fast_scan ? ReadMode::Fast : ReadMode::Normal, extra_table_id_index, physical_table_id, log_tracing_id); diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index f4cc49cdcd0..27811691cb8 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -103,7 +103,7 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); switch (read_mode) { - case Normal: + case ReadMode::Normal: stream = seg->getInputStream( *dm_context, columns_to_read, @@ -113,7 +113,7 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t max_version, block_size); break; - case Fast: + case ReadMode::Fast: stream = seg->getInputStreamFast( *dm_context, columns_to_read, @@ -122,7 +122,7 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t filter, block_size); break; - case Raw: + case ReadMode::Raw: stream = seg->getInputStreamRaw( *dm_context, columns_to_read, diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index 9ecea73e2a6..3be848098ba 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -111,9 +111,9 @@ class BlockStat std::atomic total_bytes; }; -enum ReadMode +enum class ReadMode { - Normal = 0, // Read in Normal Mode. + Normal, // Read in Normal Mode. Fast, // Read in Fast Mode, read data without mvcc and sorted merge, but filter the data whose del_mark = 1. Raw, // Just for 'selraw * ', always using in debug code. }; From a31ed93a0c592c74afc49dc69030632581ec451c Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Fri, 30 Sep 2022 21:04:11 +0800 Subject: [PATCH 05/12] Update dbms/src/Storages/DeltaMerge/Segment.cpp Co-authored-by: Wenxuan --- dbms/src/Storages/DeltaMerge/Segment.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index e95df5a18d6..834689aafaf 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -529,9 +529,9 @@ BlockInputStreamPtr Segment::getInputStreamFast( /// When we read in fast mode, we can try to do the following optimization: /// 1. Handle Column Optimization: - /// when the columns_to_read does not include HANDLE_COLUMN, - /// we can try to skip reading the handle column if the pack's handle range is fully within read range. - /// Thus, in this case, we set enable_handle_clean_read = true. + /// when the columns_to_read does not include HANDLE_COLUMN, + /// we can try to skip reading the handle column if the pack's handle range is fully within read range. + /// Thus, in this case, we set enable_handle_clean_read = true. /// 2. Del Column Optimization: /// when the columns_to_read does not include TAG_COLUMN, /// we can try to skip reading the del column if the pack has no deleted rows. From b7a9a74d2f81bf51be2a335e1964354506f41637 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Fri, 30 Sep 2022 21:06:41 +0800 Subject: [PATCH 06/12] Update dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h Co-authored-by: Wenxuan --- .../Storages/DeltaMerge/SegmentReadTaskPool.h | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index 3be848098ba..2d9884b6f9a 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -113,9 +113,22 @@ class BlockStat enum class ReadMode { - Normal, // Read in Normal Mode. - Fast, // Read in Fast Mode, read data without mvcc and sorted merge, but filter the data whose del_mark = 1. - Raw, // Just for 'selraw * ', always using in debug code. + /** + * Read in normal mode. Data is ordered by PK, and only the most recent version is returned. + */ + Normal, + + /** + * Read in fast mode. Data is not sort merged, and all versions are returned. However, deleted records (del_mark=1) + * will be still filtered out. + */ + Fast, + + /** + * Read in raw mode, for example, for statements like `SELRAW *`. In raw mode, data is not sort merged and all versions + * are just returned. + */ + Raw, }; class SegmentReadTaskPool : private boost::noncopyable From 768f657fd00bb7407d7a8e5482a18118928af9eb Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 12 Oct 2022 17:58:01 +0800 Subject: [PATCH 07/12] add some comments --- dbms/src/Storages/DeltaMerge/Segment.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index e95df5a18d6..aa772f279dc 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -572,9 +572,11 @@ BlockInputStreamPtr Segment::getInputStreamFast( BlockInputStreamPtr delta_stream = std::make_shared(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range); + // Do row key filtering based on data_ranges. delta_stream = std::make_shared>(delta_stream, data_ranges, 0); stable_stream = std::make_shared>(stable_stream, data_ranges, 0); + // Filter the unneeded column and filter out the rows whose del_mark is true. delta_stream = std::make_shared(delta_stream, columns_to_read, dm_context.tracing_id); stable_stream = std::make_shared(stable_stream, columns_to_read, dm_context.tracing_id); @@ -625,9 +627,11 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, BlockInputStreamPtr delta_stream = std::make_shared(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range); + // Do row key filtering based on data_ranges. delta_stream = std::make_shared>(delta_stream, data_ranges, 0); stable_stream = std::make_shared>(stable_stream, data_ranges, 0); + // Filter the unneeded columns. delta_stream = std::make_shared(delta_stream, columns_to_read); stable_stream = std::make_shared(stable_stream, columns_to_read); From 8a7274f7c715edf0dce8c8936ebb4aadde50a1e6 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 12 Oct 2022 18:45:13 +0800 Subject: [PATCH 08/12] make some improvements for code and comments --- .../DeltaMerge/DMSegmentThreadInputStream.h | 35 +--------- .../DeltaMerge/SegmentReadTaskPool.cpp | 68 ++++++++++--------- .../Storages/DeltaMerge/SegmentReadTaskPool.h | 5 +- 3 files changed, 42 insertions(+), 66 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h index 6bde0c25074..c2192fafecb 100644 --- a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h @@ -98,39 +98,8 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream LOG_FMT_DEBUG(log, "Read done"); return {}; } - - cur_segment = task->segment; - auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); - switch (read_mode) - { - case ReadMode::Normal: - cur_stream = cur_segment->getInputStream( - *dm_context, - columns_to_read, - task->read_snapshot, - task->ranges, - filter, - max_version, - block_size); - break; - case ReadMode::Fast: - cur_stream = cur_segment->getInputStreamFast( - *dm_context, - columns_to_read, - task->read_snapshot, - task->ranges, - filter, - block_size); - break; - case ReadMode::Raw: - cur_stream = cur_segment->getInputStreamRaw( - *dm_context, - columns_to_read, - task->read_snapshot, - task->ranges); - break; - } - LOG_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo()); + buildStreamBasedOnReadMode(cur_stream, read_mode, task, dm_context, columns_to_read, filter, max_version, expected_block_size); + LOG_TRACE(log, "Start to read segment, segment={}", task->segment->simpleInfo()); } FAIL_POINT_PAUSE(FailPoints::pause_when_reading_from_dt_stream); diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index 27811691cb8..f4d47180e0b 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -23,6 +23,40 @@ extern const Metric DT_SegmentReadTasks; namespace DB::DM { +void buildStreamBasedOnReadMode(BlockInputStreamPtr & stream, const ReadMode & read_mode, const SegmentReadTaskPtr & task, const DMContextPtr & dm_context, const ColumnDefines & columns_to_read, const RSOperatorPtr & filter, const uint64_t max_version, const size_t expected_block_size) +{ + auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); + switch (read_mode) + { + case ReadMode::Normal: + stream = task->segment->getInputStream( + *dm_context, + columns_to_read, + task->read_snapshot, + task->ranges, + filter, + max_version, + block_size); + break; + case ReadMode::Fast: + stream = task->segment->getInputStreamFast( + *dm_context, + columns_to_read, + task->read_snapshot, + task->ranges, + filter, + block_size); + break; + case ReadMode::Raw: + stream = task->segment->getInputStreamRaw( + *dm_context, + columns_to_read, + task->read_snapshot, + task->ranges); + break; + } +} + SegmentReadTask::SegmentReadTask(const SegmentPtr & segment_, // const SegmentSnapshotPtr & read_snapshot_, const RowKeyRanges & ranges_) @@ -98,39 +132,9 @@ SegmentReadTasks SegmentReadTask::trySplitReadTasks(const SegmentReadTasks & tas BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t) { MemoryTrackerSetter setter(true, mem_tracker.get()); - auto seg = t->segment; BlockInputStreamPtr stream; - auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); - switch (read_mode) - { - case ReadMode::Normal: - stream = seg->getInputStream( - *dm_context, - columns_to_read, - t->read_snapshot, - t->ranges, - filter, - max_version, - block_size); - break; - case ReadMode::Fast: - stream = seg->getInputStreamFast( - *dm_context, - columns_to_read, - t->read_snapshot, - t->ranges, - filter, - block_size); - break; - case ReadMode::Raw: - stream = seg->getInputStreamRaw( - *dm_context, - columns_to_read, - t->read_snapshot, - t->ranges); - break; - } - LOG_FMT_DEBUG(log, "getInputStream succ, pool_id={} segment_id={}", pool_id, seg->segmentId()); + buildStreamBasedOnReadMode(stream, read_mode, t, dm_context, columns_to_read, filter, max_version, expected_block_size); + LOG_FMT_DEBUG(log, "getInputStream succ, pool_id={} segment_id={}", pool_id, t->segment->segmentId()); return stream; } diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index 2d9884b6f9a..f7535cdb35b 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -19,6 +19,8 @@ #include #include +#include "Storages/DeltaMerge/Segment.h" + namespace DB { namespace DM @@ -123,7 +125,7 @@ enum class ReadMode * will be still filtered out. */ Fast, - + /** * Read in raw mode, for example, for statements like `SELRAW *`. In raw mode, data is not sort merged and all versions * are just returned. @@ -131,6 +133,7 @@ enum class ReadMode Raw, }; +void buildStreamBasedOnReadMode(BlockInputStreamPtr & stream, const ReadMode & read_mode, const SegmentReadTaskPtr & task, const DMContextPtr & dm_context, const ColumnDefines & columns_to_read, const RSOperatorPtr & filter, const uint64_t max_version, const size_t expected_block_size); class SegmentReadTaskPool : private boost::noncopyable { public: From 2a1be54b5c25f1cc94deb33115d8c595915d6336 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Thu, 13 Oct 2022 08:51:03 +0800 Subject: [PATCH 09/12] refactor --- .../DeltaMerge/DMSegmentThreadInputStream.h | 7 +- dbms/src/Storages/DeltaMerge/Segment.cpp | 62 ++++++++++---- dbms/src/Storages/DeltaMerge/Segment.h | 22 +++-- .../DeltaMerge/SegmentReadTaskPool.cpp | 37 +------- .../Storages/DeltaMerge/SegmentReadTaskPool.h | 3 - .../DeltaMerge/tests/gtest_dm_segment.cpp | 84 +++++++++---------- .../tests/gtest_dm_segment_common_handle.cpp | 52 ++++++------ .../tests/gtest_segment_replace_data.cpp | 2 +- .../tests/gtest_segment_test_basic.cpp | 4 +- 9 files changed, 139 insertions(+), 134 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h index 463ce91c57a..fb57b9472ce 100644 --- a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h @@ -98,8 +98,11 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream LOG_DEBUG(log, "Read done"); return {}; } - buildStreamBasedOnReadMode(cur_stream, read_mode, task, dm_context, columns_to_read, filter, max_version, expected_block_size); - LOG_TRACE(log, "Start to read segment, segment={}", task->segment->simpleInfo()); + cur_segment = task->segment; + + auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); + cur_stream = task->segment->getInputStream(read_mode, *dm_context, columns_to_read, task->read_snapshot, task->ranges, filter, max_version, block_size); + LOG_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo()); } FAIL_POINT_PAUSE(FailPoints::pause_when_reading_from_dt_stream); diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 03cb2ceda8f..72bafe7e9e2 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -45,6 +45,8 @@ #include #include +#include "Storages/DeltaMerge/SegmentReadTaskPool.h" + namespace ProfileEvents { extern const Event DMWriteBlock; @@ -395,13 +397,37 @@ SegmentSnapshotPtr Segment::createSnapshot(const DMContext & dm_context, bool fo return std::make_shared(std::move(delta_snap), std::move(stable_snap)); } -BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, + +BlockInputStreamPtr Segment::getInputStream(const ReadMode & read_mode, + const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, UInt64 max_version, size_t expected_block_size) +{ + switch (read_mode) + { + case ReadMode::Normal: + return getInputStreamModeNormal(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size); + break; + case ReadMode::Fast: + return getInputStreamModeFast(dm_context, columns_to_read, segment_snap, read_ranges, filter, expected_block_size); + break; + case ReadMode::Raw: + return getInputStreamModeRaw(dm_context, columns_to_read, segment_snap, read_ranges, expected_block_size); + break; + } +} + +BlockInputStreamPtr Segment::getInputStreamModeNormal(const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size) { LOG_TRACE(log, "Begin segment create input stream"); @@ -479,17 +505,17 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, return stream; } -BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, - const ColumnDefines & columns_to_read, - const RowKeyRanges & read_ranges, - const RSOperatorPtr & filter, - UInt64 max_version, - size_t expected_block_size) +BlockInputStreamPtr Segment::getInputStreamModeNormal(const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size) { auto segment_snap = createSnapshot(dm_context, false, CurrentMetrics::DT_SnapshotOfRead); if (!segment_snap) return {}; - return getInputStream(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size); + return getInputStreamModeNormal(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size); } BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_context, @@ -527,11 +553,11 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_co return data_stream; } -/// We call getInputStreamFast when we read in fast mode. +/// We call getInputStreamModeFast when we read in fast mode. /// In this case, we will read all the data in delta and stable, and then merge them without sorting. /// Besides, we will do del_mark != 0 filtering to drop the deleted rows. /// In conclusion, the output is unsorted, and does not do mvcc filtering. -BlockInputStreamPtr Segment::getInputStreamFast( +BlockInputStreamPtr Segment::getInputStreamModeFast( const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, @@ -618,13 +644,13 @@ BlockInputStreamPtr Segment::getInputStreamFast( return std::make_shared(streams, dm_context.tracing_id); } -/// We call getInputStreamRaw in 'selraw xxxx' statement, which is always in test for debug. +/// We call getInputStreamModeRaw in 'selraw xxxx' statement, which is always in test for debug. /// In this case, we will read all the data without mvcc filtering and sorted merge. -BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, - const ColumnDefines & columns_to_read, - const SegmentSnapshotPtr & segment_snap, - const RowKeyRanges & data_ranges, - size_t expected_block_size) +BlockInputStreamPtr Segment::getInputStreamModeRaw(const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & data_ranges, + size_t expected_block_size) { auto new_columns_to_read = std::make_shared(); @@ -673,12 +699,12 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, return std::make_shared(streams, dm_context.tracing_id); } -BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read) +BlockInputStreamPtr Segment::getInputStreamModeRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read) { auto segment_snap = createSnapshot(dm_context, false, CurrentMetrics::DT_SnapshotOfReadRaw); if (!segment_snap) return {}; - return getInputStreamRaw(dm_context, columns_to_read, segment_snap, {rowkey_range}); + return getInputStreamModeRaw(dm_context, columns_to_read, segment_snap, {rowkey_range}); } SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr & schema_snap) const diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index b8d4ff105eb..46d91a0ca82 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -28,6 +28,8 @@ #include #include +#include "Storages/DeltaMerge/SegmentReadTaskPool.h" + namespace DB::DM { class Segment; @@ -155,7 +157,7 @@ class Segment : private boost::noncopyable SegmentSnapshotPtr createSnapshot(const DMContext & dm_context, bool for_update, CurrentMetrics::Metric metric) const; - BlockInputStreamPtr getInputStream( + BlockInputStreamPtr getInputStreamModeNormal( const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, @@ -164,7 +166,7 @@ class Segment : private boost::noncopyable UInt64 max_version, size_t expected_block_size); - BlockInputStreamPtr getInputStream( + BlockInputStreamPtr getInputStreamModeNormal( const DMContext & dm_context, const ColumnDefines & columns_to_read, const RowKeyRanges & read_ranges, @@ -182,7 +184,17 @@ class Segment : private boost::noncopyable size_t expected_block_size = DEFAULT_BLOCK_SIZE, bool reorganize_block = true) const; - BlockInputStreamPtr getInputStreamFast( + BlockInputStreamPtr getInputStream( + const ReadMode & read_mode, + const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size); + + BlockInputStreamPtr getInputStreamModeFast( const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, @@ -190,14 +202,14 @@ class Segment : private boost::noncopyable const RSOperatorPtr & filter, size_t expected_block_size = DEFAULT_BLOCK_SIZE); - BlockInputStreamPtr getInputStreamRaw( + BlockInputStreamPtr getInputStreamModeRaw( const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & data_ranges, size_t expected_block_size = DEFAULT_BLOCK_SIZE); - BlockInputStreamPtr getInputStreamRaw( + BlockInputStreamPtr getInputStreamModeRaw( const DMContext & dm_context, const ColumnDefines & columns_to_read); diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index b5ea3c9b5b0..3a12371e297 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -23,40 +23,6 @@ extern const Metric DT_SegmentReadTasks; namespace DB::DM { -void buildStreamBasedOnReadMode(BlockInputStreamPtr & stream, const ReadMode & read_mode, const SegmentReadTaskPtr & task, const DMContextPtr & dm_context, const ColumnDefines & columns_to_read, const RSOperatorPtr & filter, const uint64_t max_version, const size_t expected_block_size) -{ - auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); - switch (read_mode) - { - case ReadMode::Normal: - stream = task->segment->getInputStream( - *dm_context, - columns_to_read, - task->read_snapshot, - task->ranges, - filter, - max_version, - block_size); - break; - case ReadMode::Fast: - stream = task->segment->getInputStreamFast( - *dm_context, - columns_to_read, - task->read_snapshot, - task->ranges, - filter, - block_size); - break; - case ReadMode::Raw: - stream = task->segment->getInputStreamRaw( - *dm_context, - columns_to_read, - task->read_snapshot, - task->ranges); - break; - } -} - SegmentReadTask::SegmentReadTask(const SegmentPtr & segment_, // const SegmentSnapshotPtr & read_snapshot_, const RowKeyRanges & ranges_) @@ -133,7 +99,8 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t { MemoryTrackerSetter setter(true, mem_tracker.get()); BlockInputStreamPtr stream; - buildStreamBasedOnReadMode(stream, read_mode, t, dm_context, columns_to_read, filter, max_version, expected_block_size); + auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); + stream = t->segment->getInputStream(read_mode, *dm_context, columns_to_read, t->read_snapshot, t->ranges, filter, max_version, block_size); LOG_DEBUG(log, "getInputStream succ, pool_id={} segment_id={}", pool_id, t->segment->segmentId()); return stream; } diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index 205a9951d2e..985caf35156 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -19,8 +19,6 @@ #include #include -#include "Storages/DeltaMerge/Segment.h" - namespace DB { namespace DM @@ -133,7 +131,6 @@ enum class ReadMode Raw, }; -void buildStreamBasedOnReadMode(BlockInputStreamPtr & stream, const ReadMode & read_mode, const SegmentReadTaskPtr & task, const DMContextPtr & dm_context, const ColumnDefines & columns_to_read, const RSOperatorPtr & filter, const uint64_t max_version, const size_t expected_block_size); class SegmentReadTaskPool : private boost::noncopyable { public: diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index f17f90d46fc..d6dacd90e5c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -138,7 +138,7 @@ try { // Round 1 { // read written data (only in delta) - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -149,7 +149,7 @@ try { // read written data (only in stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } } @@ -165,7 +165,7 @@ try { // Round 2 { // read written data (both in delta and stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write + num_rows_write_2); } @@ -176,7 +176,7 @@ try { // read written data (only in stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write + num_rows_write_2); } } @@ -206,7 +206,7 @@ try } { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); // only write two visible pks ASSERT_INPUTSTREAM_NROWS(in, 2); } @@ -242,7 +242,7 @@ try { // Round 1 { // read written data (only in delta) - auto in = segment->getInputStream(dmContext(), *tableColumns(), read_ranges); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), read_ranges); ASSERT_INPUTSTREAM_NROWS(in, expect_read_rows); } @@ -255,7 +255,7 @@ try { // read written data (only in stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), read_ranges); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), read_ranges); ASSERT_INPUTSTREAM_NROWS(in, expect_read_rows); } } @@ -273,7 +273,7 @@ try { // Round 2 { // read written data (both in delta and stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), read_ranges); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), read_ranges); ASSERT_INPUTSTREAM_NROWS(in, expect_read_rows_2); } @@ -285,7 +285,7 @@ try { // read written data (only in stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), read_ranges); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), read_ranges); ASSERT_INPUTSTREAM_NROWS(in, expect_read_rows_2); } } @@ -317,19 +317,19 @@ try // Thread A write_rows(100); ASSERT_INPUTSTREAM_NROWS( - segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}), + segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}), 100); auto snap = segment->createSnapshot(dmContext(), false, CurrentMetrics::DT_SnapshotOfRead); // Thread B write_rows(100); ASSERT_INPUTSTREAM_NROWS( - segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}), + segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}), 200); // Thread A { - auto in = segment->getInputStream( + auto in = segment->getInputStreamModeNormal( dmContext(), *tableColumns(), snap, @@ -365,7 +365,7 @@ try } auto get_rows = [&](const RowKeyRange & range) { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {range}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {range}); return getInputStreamNRows(in); }; @@ -435,7 +435,7 @@ try if (read_before_delete) { // read written data - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -456,7 +456,7 @@ try { // read after delete range - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -467,7 +467,7 @@ try if (merge_delta_after_delete) { // read raw after delete range - auto in = segment->getInputStreamRaw(dmContext(), *tableColumns()); + auto in = segment->getInputStreamModeRaw(dmContext(), *tableColumns()); // Only 2 rows are left on disk, others are compacted. ASSERT_INPUTSTREAM_NROWS(in, 2); } @@ -488,7 +488,7 @@ try if (read_before_delete) { // read written data - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -517,7 +517,7 @@ try { // read after delete range - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -528,7 +528,7 @@ try if (merge_delta_after_delete) { // read raw after delete range - auto in = segment->getInputStreamRaw(dmContext(), *tableColumns()); + auto in = segment->getInputStreamModeRaw(dmContext(), *tableColumns()); // Only 2 rows are left on disk, others are compacted. ASSERT_INPUTSTREAM_NROWS(in, 2); } @@ -558,7 +558,7 @@ try if (read_before_delete) { // read written data - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -579,7 +579,7 @@ try { // read after delete range - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -590,7 +590,7 @@ try if (merge_delta_after_delete) { // read raw after delete range - auto in = segment->getInputStreamRaw(dmContext(), *tableColumns()); + auto in = segment->getInputStreamModeRaw(dmContext(), *tableColumns()); // Only 2 rows are left on disk, others are compacted. ASSERT_INPUTSTREAM_NROWS(in, 2); } @@ -633,7 +633,7 @@ try { // Read after deletion // The deleted range has no overlap with current data, so there should be no change - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -653,7 +653,7 @@ try { // Read after deletion // The deleted range has overlap range [63, 64) with current data, so the record with Handle 63 should be deleted - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -671,7 +671,7 @@ try { // Read after deletion - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); std::vector pk_coldata{0}; auto tmp = createNumbers(32, 63); @@ -694,7 +694,7 @@ try { // Read after deletion - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); std::vector pk_coldata{0}; auto tmp = createNumbers(32, 63); pk_coldata.insert(pk_coldata.end(), tmp.begin(), tmp.end()); @@ -716,7 +716,7 @@ try { // Read after deletion - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); std::vector pk_coldata = createNumbers(32, 63); ASSERT_INPUTSTREAM_COLS_UR( in, @@ -734,7 +734,7 @@ try { // Read after new write - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); std::vector pk_coldata = createNumbers(9, 16); auto tmp = createNumbers(32, 63); pk_coldata.insert(pk_coldata.end(), tmp.begin(), tmp.end()); @@ -765,7 +765,7 @@ try { // read written data - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -784,8 +784,8 @@ try EXPECT_EQ(*s2_range.end.value, *old_range.end.value); // TODO check segment epoch is increase - size_t num_rows_seg1 = getInputStreamNRows(segment->getInputStream(dmContext(), *tableColumns(), {segment->getRowKeyRange()})); - size_t num_rows_seg2 = getInputStreamNRows(new_segment->getInputStream(dmContext(), *tableColumns(), {new_segment->getRowKeyRange()})); + size_t num_rows_seg1 = getInputStreamNRows(segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {segment->getRowKeyRange()})); + size_t num_rows_seg2 = getInputStreamNRows(new_segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {new_segment->getRowKeyRange()})); ASSERT_EQ(num_rows_seg1 + num_rows_seg2, num_rows_write); // delete rows in the right segment @@ -805,7 +805,7 @@ try // TODO check segment epoch is increase } { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_seg1); } } @@ -839,8 +839,8 @@ try // If they are equal, result will be true, otherwise it will be false. auto compare = [&](const SegmentPtr & seg1, const SegmentPtr & seg2, bool & result) { result = false; - auto in1 = seg1->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); - auto in2 = seg2->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in1 = seg1->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in2 = seg2->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); in1->readPrefix(); in2->readPrefix(); for (;;) @@ -972,7 +972,7 @@ try { // Read after writing - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -1106,7 +1106,7 @@ try }; auto read_rows = [&](const SegmentPtr & segment) { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); return getInputStreamNRows(in); }; @@ -1249,7 +1249,7 @@ try { // read written data - auto in = segment->getInputStream(dmContext(), *columns_to_read, {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *columns_to_read, {RowKeyRange::newAll(false, 1)}); // check that we can read correct values ASSERT_INPUTSTREAM_COLS_UR( in, @@ -1281,7 +1281,7 @@ try { // read written data - auto in = segment->getInputStream(dmContext(), *columns_to_read, {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *columns_to_read, {RowKeyRange::newAll(false, 1)}); // check that we can read correct values // [0, 50) is the old signed values, [50, 100) is replaced by newer written values @@ -1307,7 +1307,7 @@ try { // check the stable data with new schema - auto in = segment->getInputStream(dmContext(), *columns_to_read, {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *columns_to_read, {RowKeyRange::newAll(false, 1)}); // check that we can read correct values std::vector i32_columndata = createSignedNumbers(0, num_rows_write / 2); @@ -1372,7 +1372,7 @@ try { // read written data - auto in = segment->getInputStream(dmContext(), *columns_after_ddl, {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *columns_after_ddl, {RowKeyRange::newAll(false, 1)}); // check that we can read correct values ASSERT_INPUTSTREAM_COLS_UR( @@ -1407,7 +1407,7 @@ try { // read written data - auto in = segment->getInputStream(dmContext(), *columns_after_ddl, {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *columns_after_ddl, {RowKeyRange::newAll(false, 1)}); // check that we can read correct values // First 50 values are default value @@ -1434,7 +1434,7 @@ try { // read written data after delta-merge - auto in = segment->getInputStream(dmContext(), *columns_after_ddl, {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *columns_after_ddl, {RowKeyRange::newAll(false, 1)}); // check that we can read correct values // First 50 values are default value diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp index dfab80df230..864fdb73b39 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp @@ -129,7 +129,7 @@ try { // Round 1 { // read written data (only in delta) - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -140,7 +140,7 @@ try { // read written data (only in stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } } @@ -164,7 +164,7 @@ try { // Round 2 { // read written data (both in delta and stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write + num_rows_write_2); } @@ -175,7 +175,7 @@ try { // read written data (only in stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write + num_rows_write_2); } } @@ -220,7 +220,7 @@ try { // Round 1 { // read written data (only in delta) - auto in = segment->getInputStream(dmContext(), *tableColumns(), read_ranges); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), read_ranges); ASSERT_INPUTSTREAM_NROWS(in, expect_read_rows); } @@ -233,7 +233,7 @@ try { // read written data (only in stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), read_ranges); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), read_ranges); ASSERT_INPUTSTREAM_NROWS(in, expect_read_rows); } } @@ -259,7 +259,7 @@ try { // Round 2 { // read written data (both in delta and stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), read_ranges); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), read_ranges); ASSERT_INPUTSTREAM_NROWS(in, expect_read_rows_2); } @@ -271,7 +271,7 @@ try { // read written data (only in stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), read_ranges); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), read_ranges); ASSERT_INPUTSTREAM_NROWS(in, expect_read_rows_2); } } @@ -306,7 +306,7 @@ try if (read_before_delete) { // read written data - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -325,7 +325,7 @@ try { // read after delete range - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); const size_t nrows_after_delete = 2; // mock common handle auto common_handle_coldata = [this]() { @@ -367,7 +367,7 @@ try if (read_before_delete) { // read written data - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -394,7 +394,7 @@ try { // read after delete range - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); const size_t nrows_after_delete = 2; // mock common handle auto common_handle_coldata = [this]() { @@ -453,7 +453,7 @@ try if (read_before_delete) { // read written data - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -472,7 +472,7 @@ try { // read after delete range - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); const size_t nrows_after_delete = 2; // mock common handle auto common_handle_coldata = [this]() { @@ -529,7 +529,7 @@ try { // Read after deletion // The deleted range has no overlap with current data, so there should be no change - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); // mock common handle auto common_handle_coldata = [this]() { auto tmp = createNumbers(0, num_rows_write); @@ -557,7 +557,7 @@ try { // Read after deletion // The deleted range has overlap range [63, 64) with current data, so the record with Handle 63 should be deleted - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); // mock common handle auto common_handle_coldata = [this]() { std::vector int_coldata = createNumbers(0, 63); @@ -584,7 +584,7 @@ try { // Read after deletion - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); // mock common handle auto common_handle_coldata = [this]() { // the result should be [0, 32,33,34,...62] @@ -614,7 +614,7 @@ try { // Read after deletion - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); // mock common handle auto common_handle_coldata = [this]() { std::vector int_coldata{0}; @@ -643,7 +643,7 @@ try { // Read after deletion - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); // mock common handle auto common_handle_coldata = [this]() { std::vector int_coldata = createNumbers(32, 63); @@ -681,7 +681,7 @@ try { // read written data - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -700,8 +700,8 @@ try EXPECT_EQ(*s2_range.end.value, *old_range.end.value); // TODO check segment epoch is increase - size_t num_rows_seg1 = getInputStreamNRows(segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)})); - size_t num_rows_seg2 = getInputStreamNRows(segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)})); + size_t num_rows_seg1 = getInputStreamNRows(segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)})); + size_t num_rows_seg2 = getInputStreamNRows(segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)})); ASSERT_EQ(num_rows_seg1 + num_rows_seg2, num_rows_write); // merge segments @@ -714,7 +714,7 @@ try // TODO check segment epoch is increase } { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } } @@ -727,8 +727,8 @@ try // If they are equal, result will be true, otherwise it will be false. auto compare = [&](const SegmentPtr & seg1, const SegmentPtr & seg2, bool & result) { result = false; - auto in1 = seg1->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); - auto in2 = seg2->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in1 = seg1->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in2 = seg2->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); in1->readPrefix(); in2->readPrefix(); for (;;) @@ -874,7 +874,7 @@ try { // Read after writing - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_EQ(temp.size(), num_batches_written * (num_rows_per_write - 2)); // mock common handle auto common_handle_coldata = [this, &temp]() { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp index 8a09cc87594..16c6c54d7bf 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp @@ -459,7 +459,7 @@ try writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 41); // 41 in memtable auto segment = segments[DELTA_MERGE_FIRST_SEGMENT_ID]; - auto in_stream = segment->getInputStreamRaw(*dm_context, *tableColumns()); + auto in_stream = segment->getInputStreamModeRaw(*dm_context, *tableColumns()); // Now let's replace data. auto block = prepareWriteBlock(0, 233); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 66e885532dc..2078ee7f2b0 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -64,7 +64,7 @@ size_t SegmentTestBasic::getSegmentRowNumWithoutMVCC(PageId segment_id) { RUNTIME_CHECK(segments.find(segment_id) != segments.end()); auto segment = segments[segment_id]; - auto in = segment->getInputStreamRaw(*dm_context, *tableColumns()); + auto in = segment->getInputStreamModeRaw(*dm_context, *tableColumns()); return getInputStreamNRows(in); } @@ -72,7 +72,7 @@ size_t SegmentTestBasic::getSegmentRowNum(PageId segment_id) { RUNTIME_CHECK(segments.find(segment_id) != segments.end()); auto segment = segments[segment_id]; - auto in = segment->getInputStream(*dm_context, *tableColumns(), {segment->getRowKeyRange()}); + auto in = segment->getInputStreamModeNormal(*dm_context, *tableColumns(), {segment->getRowKeyRange()}); return getInputStreamNRows(in); } From 58ec367cb713babfeeb138867a20e89b5fce842d Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Thu, 13 Oct 2022 08:54:45 +0800 Subject: [PATCH 10/12] refactor --- dbms/src/Storages/DeltaMerge/Segment.h | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 46d91a0ca82..bc36f2aa057 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -157,6 +157,16 @@ class Segment : private boost::noncopyable SegmentSnapshotPtr createSnapshot(const DMContext & dm_context, bool for_update, CurrentMetrics::Metric metric) const; + BlockInputStreamPtr getInputStream( + const ReadMode & read_mode, + const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size); + BlockInputStreamPtr getInputStreamModeNormal( const DMContext & dm_context, const ColumnDefines & columns_to_read, @@ -184,16 +194,6 @@ class Segment : private boost::noncopyable size_t expected_block_size = DEFAULT_BLOCK_SIZE, bool reorganize_block = true) const; - BlockInputStreamPtr getInputStream( - const ReadMode & read_mode, - const DMContext & dm_context, - const ColumnDefines & columns_to_read, - const SegmentSnapshotPtr & segment_snap, - const RowKeyRanges & read_ranges, - const RSOperatorPtr & filter, - UInt64 max_version, - size_t expected_block_size); - BlockInputStreamPtr getInputStreamModeFast( const DMContext & dm_context, const ColumnDefines & columns_to_read, From 4b2530e2ec9278d2514c4b134754c3ffc29d8aa4 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Thu, 13 Oct 2022 09:31:29 +0800 Subject: [PATCH 11/12] fix typo --- dbms/src/Storages/DeltaMerge/Segment.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index bc36f2aa057..fe26cb68eac 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -23,13 +23,12 @@ #include #include #include +#include #include #include #include #include -#include "Storages/DeltaMerge/SegmentReadTaskPool.h" - namespace DB::DM { class Segment; From 0320e015e4100114eefabb522b7a9e50002009e3 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Thu, 13 Oct 2022 09:34:28 +0800 Subject: [PATCH 12/12] fix typo --- dbms/src/Storages/DeltaMerge/Segment.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 72bafe7e9e2..022e7537c7e 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -45,8 +45,6 @@ #include #include -#include "Storages/DeltaMerge/SegmentReadTaskPool.h" - namespace ProfileEvents { extern const Event DMWriteBlock;