diff --git a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h index 2438f881a81..fb57b9472ce 100644 --- a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h @@ -47,8 +47,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream const RSOperatorPtr & filter_, UInt64 max_version_, size_t expected_block_size_, - bool is_raw_, - bool do_delete_mark_filter_for_raw_, + ReadMode read_mode_, const int extra_table_id_index, const TableID physical_table_id, const String & req_id) @@ -60,8 +59,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream , header(toEmptyBlock(columns_to_read)) , max_version(max_version_) , expected_block_size(expected_block_size_) - , is_raw(is_raw_) - , do_delete_mark_filter_for_raw(do_delete_mark_filter_for_raw_) + , read_mode(read_mode_) , extra_table_id_index(extra_table_id_index) , physical_table_id(physical_table_id) , log(Logger::get(NAME, req_id)) @@ -100,29 +98,10 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream LOG_DEBUG(log, "Read done"); return {}; } - cur_segment = task->segment; - if (is_raw) - { - cur_stream = cur_segment->getInputStreamRaw( - *dm_context, - columns_to_read, - task->read_snapshot, - task->ranges, - filter, - do_delete_mark_filter_for_raw); - } - else - { - cur_stream = cur_segment->getInputStream( - *dm_context, - columns_to_read, - task->read_snapshot, - task->ranges, - filter, - max_version, - std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows))); - } + + auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); + cur_stream = task->segment->getInputStream(read_mode, *dm_context, columns_to_read, task->read_snapshot, task->ranges, filter, max_version, block_size); LOG_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo()); } FAIL_POINT_PAUSE(FailPoints::pause_when_reading_from_dt_stream); @@ -172,8 +151,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream Block header; const UInt64 max_version; const size_t expected_block_size; - const bool is_raw; - const bool do_delete_mark_filter_for_raw; + const ReadMode read_mode; // position of the ExtraPhysTblID column in column_names parameter in the StorageDeltaMerge::read function. const int extra_table_id_index; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 48d055b7e57..732287338d0 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -851,7 +851,7 @@ void DeltaMergeStore::compact(const Context & db_context, const RowKeyRange & ra } } -// Read data without mvcc filtering && delete mark != 0 filtering. +// Read data without mvcc filtering. // just for debug // readRaw is called under 'selraw xxxx' BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, @@ -908,8 +908,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, EMPTY_FILTER, std::numeric_limits::max(), DEFAULT_BLOCK_SIZE, - /* is_raw = */ true, - /* do_delete_mark_filter_for_raw = */ false, + /* read_mode */ ReadMode::Raw, std::move(tasks), after_segment_read, req_info); @@ -937,8 +936,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, EMPTY_FILTER, std::numeric_limits::max(), DEFAULT_BLOCK_SIZE, - /* is_raw_ */ true, - /* do_delete_mark_filter_for_raw_ */ false, // don't do filter based on del_mark = 1 + /* read_mode */ ReadMode::Raw, extra_table_id_index, physical_table_id, req_info); @@ -992,8 +990,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, filter, max_version, expected_block_size, - /* is_raw = */ is_fast_scan, - /* do_delete_mark_filter_for_raw = */ is_fast_scan, + /* read_mode = */ is_fast_scan ? ReadMode::Fast : ReadMode::Normal, std::move(tasks), after_segment_read, log_tracing_id); @@ -1021,8 +1018,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, filter, max_version, expected_block_size, - /* is_raw_= */ is_fast_scan, - /* do_delete_mark_filter_for_raw_= */ is_fast_scan, + /* read_mode = */ is_fast_scan ? ReadMode::Fast : ReadMode::Normal, extra_table_id_index, physical_table_id, log_tracing_id); diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 4f394c13329..022e7537c7e 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -395,13 +395,37 @@ SegmentSnapshotPtr Segment::createSnapshot(const DMContext & dm_context, bool fo return std::make_shared(std::move(delta_snap), std::move(stable_snap)); } -BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, + +BlockInputStreamPtr Segment::getInputStream(const ReadMode & read_mode, + const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, UInt64 max_version, size_t expected_block_size) +{ + switch (read_mode) + { + case ReadMode::Normal: + return getInputStreamModeNormal(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size); + break; + case ReadMode::Fast: + return getInputStreamModeFast(dm_context, columns_to_read, segment_snap, read_ranges, filter, expected_block_size); + break; + case ReadMode::Raw: + return getInputStreamModeRaw(dm_context, columns_to_read, segment_snap, read_ranges, expected_block_size); + break; + } +} + +BlockInputStreamPtr Segment::getInputStreamModeNormal(const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size) { LOG_TRACE(log, "Begin segment create input stream"); @@ -479,17 +503,17 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, return stream; } -BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, - const ColumnDefines & columns_to_read, - const RowKeyRanges & read_ranges, - const RSOperatorPtr & filter, - UInt64 max_version, - size_t expected_block_size) +BlockInputStreamPtr Segment::getInputStreamModeNormal(const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size) { auto segment_snap = createSnapshot(dm_context, false, CurrentMetrics::DT_SnapshotOfRead); if (!segment_snap) return {}; - return getInputStream(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size); + return getInputStreamModeNormal(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size); } BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_context, @@ -527,64 +551,58 @@ BlockInputStreamPtr Segment::getInputStreamForDataExport(const DMContext & dm_co return data_stream; } -/// we will call getInputStreamRaw in two condition: -/// 1. Using 'selraw xxxx' statement, which is always in test for debug. (when filter_delete_mark = false) -/// In this case, we will read all the data without mvcc filtering, -/// del_mark != 0 filtering and sorted merge. -/// We will just read all the data and return. -/// 2. We read in fast mode. (when filter_delete_mark = true) -/// In this case, we will read all the data without mvcc filtering and sorted merge, -/// but we will do del_mark != 0 filtering. -BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, - const ColumnDefines & columns_to_read, - const SegmentSnapshotPtr & segment_snap, - const RowKeyRanges & data_ranges, - const RSOperatorPtr & filter, - bool filter_delete_mark, - size_t expected_block_size) +/// We call getInputStreamModeFast when we read in fast mode. +/// In this case, we will read all the data in delta and stable, and then merge them without sorting. +/// Besides, we will do del_mark != 0 filtering to drop the deleted rows. +/// In conclusion, the output is unsorted, and does not do mvcc filtering. +BlockInputStreamPtr Segment::getInputStreamModeFast( + const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & data_ranges, + const RSOperatorPtr & filter, + size_t expected_block_size) { - /// Now, we use filter_delete_mark to determine whether it is in fast mode or just from `selraw * xxxx` - /// But this way seems not to be robustness enough, maybe we need another flag? auto new_columns_to_read = std::make_shared(); // new_columns_to_read need at most columns_to_read.size() + 2, due to may extra insert into the handle column and del_mark column. new_columns_to_read->reserve(columns_to_read.size() + 2); new_columns_to_read->push_back(getExtraHandleColumnDefine(is_common_handle)); - if (filter_delete_mark) - { - new_columns_to_read->push_back(getTagColumnDefine()); - } - - bool enable_handle_clean_read = filter_delete_mark; - bool enable_del_clean_read = filter_delete_mark; + new_columns_to_read->push_back(getTagColumnDefine()); + + /// When we read in fast mode, we can try to do the following optimization: + /// 1. Handle Column Optimization: + /// when the columns_to_read does not include HANDLE_COLUMN, + /// we can try to skip reading the handle column if the pack's handle range is fully within read range. + /// Thus, in this case, we set enable_handle_clean_read = true. + /// 2. Del Column Optimization: + /// when the columns_to_read does not include TAG_COLUMN, + /// we can try to skip reading the del column if the pack has no deleted rows. + /// Thus, in this case, we set enable_del_clean_read = true. + /// 3. Version Column Optimization: + /// if the columns_to_read does not include VERSION_COLUMN, + /// we don't need to read version column, thus we don't force push version column into new_columns_to_read. + + bool enable_handle_clean_read = true; + bool enable_del_clean_read = true; for (const auto & c : columns_to_read) { - if (c.id != EXTRA_HANDLE_COLUMN_ID) + if (c.id == EXTRA_HANDLE_COLUMN_ID) { - if (filter_delete_mark && c.id == TAG_COLUMN_ID) - { - enable_del_clean_read = false; - } - else - { - new_columns_to_read->push_back(c); - } + enable_handle_clean_read = false; + } + else if (c.id == TAG_COLUMN_ID) + { + enable_del_clean_read = false; } else { - enable_handle_clean_read = false; + new_columns_to_read->push_back(c); } } - /// when we read in fast mode, if columns_to_read does not include EXTRA_HANDLE_COLUMN_ID, - /// we can try to use clean read to make optimization in stable part. - /// when the pack is under totally data_ranges and has no rows whose del_mark = 1 --> we don't need read handle_column/tag_column/version_column - /// when the pack is under totally data_ranges and has rows whose del_mark = 1 --> we don't need read handle_column/version_column - /// others --> we don't need read version_column - /// Thus, in fast mode, if we don't need read handle_column, we set enable_handle_clean_read as true. - /// If we don't need read del_column, we set enable_del_clean_read as true BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream( dm_context, *new_columns_to_read, @@ -593,25 +611,73 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, std::numeric_limits::max(), expected_block_size, /* enable_handle_clean_read */ enable_handle_clean_read, - /* is_fast_scan */ filter_delete_mark, + /* is_fast_scan */ true, /* enable_del_clean_read */ enable_del_clean_read); BlockInputStreamPtr delta_stream = std::make_shared(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range); + // Do row key filtering based on data_ranges. delta_stream = std::make_shared>(delta_stream, data_ranges, 0); stable_stream = std::make_shared>(stable_stream, data_ranges, 0); - if (filter_delete_mark) + // Filter the unneeded column and filter out the rows whose del_mark is true. + delta_stream = std::make_shared(delta_stream, columns_to_read, dm_context.tracing_id); + stable_stream = std::make_shared(stable_stream, columns_to_read, dm_context.tracing_id); + + BlockInputStreams streams; + + if (dm_context.read_delta_only) { - delta_stream = std::make_shared(delta_stream, columns_to_read, dm_context.tracing_id); - stable_stream = std::make_shared(stable_stream, columns_to_read, dm_context.tracing_id); + streams.push_back(delta_stream); + } + else if (dm_context.read_stable_only) + { + streams.push_back(stable_stream); } else { - delta_stream = std::make_shared(delta_stream, columns_to_read); - stable_stream = std::make_shared(stable_stream, columns_to_read); + streams.push_back(delta_stream); + streams.push_back(stable_stream); } + return std::make_shared(streams, dm_context.tracing_id); +} + +/// We call getInputStreamModeRaw in 'selraw xxxx' statement, which is always in test for debug. +/// In this case, we will read all the data without mvcc filtering and sorted merge. +BlockInputStreamPtr Segment::getInputStreamModeRaw(const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & data_ranges, + size_t expected_block_size) +{ + auto new_columns_to_read = std::make_shared(); + + new_columns_to_read->push_back(getExtraHandleColumnDefine(is_common_handle)); + + for (const auto & c : columns_to_read) + { + if (c.id != EXTRA_HANDLE_COLUMN_ID) + new_columns_to_read->push_back(c); + } + + BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream( + dm_context, + *new_columns_to_read, + data_ranges, + EMPTY_FILTER, + std::numeric_limits::max(), + expected_block_size, + /* enable_handle_clean_read */ false); + + BlockInputStreamPtr delta_stream = std::make_shared(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range); + + // Do row key filtering based on data_ranges. + delta_stream = std::make_shared>(delta_stream, data_ranges, 0); + stable_stream = std::make_shared>(stable_stream, data_ranges, 0); + // Filter the unneeded columns. + delta_stream = std::make_shared(delta_stream, columns_to_read); + stable_stream = std::make_shared(stable_stream, columns_to_read); BlockInputStreams streams; @@ -631,12 +697,12 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, return std::make_shared(streams, dm_context.tracing_id); } -BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read, bool filter_delete_mark) +BlockInputStreamPtr Segment::getInputStreamModeRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read) { auto segment_snap = createSnapshot(dm_context, false, CurrentMetrics::DT_SnapshotOfReadRaw); if (!segment_snap) return {}; - return getInputStreamRaw(dm_context, columns_to_read, segment_snap, {rowkey_range}, EMPTY_FILTER, filter_delete_mark); + return getInputStreamModeRaw(dm_context, columns_to_read, segment_snap, {rowkey_range}); } SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr & schema_snap) const diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index d3d81fd3536..fe26cb68eac 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -156,6 +157,7 @@ class Segment : private boost::noncopyable SegmentSnapshotPtr createSnapshot(const DMContext & dm_context, bool for_update, CurrentMetrics::Metric metric) const; BlockInputStreamPtr getInputStream( + const ReadMode & read_mode, const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, @@ -164,7 +166,16 @@ class Segment : private boost::noncopyable UInt64 max_version, size_t expected_block_size); - BlockInputStreamPtr getInputStream( + BlockInputStreamPtr getInputStreamModeNormal( + const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size); + + BlockInputStreamPtr getInputStreamModeNormal( const DMContext & dm_context, const ColumnDefines & columns_to_read, const RowKeyRanges & read_ranges, @@ -182,19 +193,24 @@ class Segment : private boost::noncopyable size_t expected_block_size = DEFAULT_BLOCK_SIZE, bool reorganize_block = true) const; - BlockInputStreamPtr getInputStreamRaw( + BlockInputStreamPtr getInputStreamModeFast( const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & data_ranges, const RSOperatorPtr & filter, - bool filter_delete_mark = true, size_t expected_block_size = DEFAULT_BLOCK_SIZE); - BlockInputStreamPtr getInputStreamRaw( + BlockInputStreamPtr getInputStreamModeRaw( const DMContext & dm_context, const ColumnDefines & columns_to_read, - bool filter_delete_mark = false); + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & data_ranges, + size_t expected_block_size = DEFAULT_BLOCK_SIZE); + + BlockInputStreamPtr getInputStreamModeRaw( + const DMContext & dm_context, + const ColumnDefines & columns_to_read); /// For those split, merge and mergeDelta methods, we should use prepareXXX/applyXXX combo in real production. /// split(), merge() and mergeDelta() are only used in test cases. diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index fcfc3056419..3a12371e297 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -98,18 +98,10 @@ SegmentReadTasks SegmentReadTask::trySplitReadTasks(const SegmentReadTasks & tas BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t) { MemoryTrackerSetter setter(true, mem_tracker.get()); - auto seg = t->segment; BlockInputStreamPtr stream; - if (is_raw) - { - stream = seg->getInputStreamRaw(*dm_context, columns_to_read, t->read_snapshot, t->ranges, filter, do_range_filter_for_raw); - } - else - { - auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); - stream = seg->getInputStream(*dm_context, columns_to_read, t->read_snapshot, t->ranges, filter, max_version, block_size); - } - LOG_DEBUG(log, "getInputStream succ, pool_id={} segment_id={}", pool_id, seg->segmentId()); + auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); + stream = t->segment->getInputStream(read_mode, *dm_context, columns_to_read, t->read_snapshot, t->ranges, filter, max_version, block_size); + LOG_DEBUG(log, "getInputStream succ, pool_id={} segment_id={}", pool_id, t->segment->segmentId()); return stream; } diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index 930515123fe..985caf35156 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -111,6 +111,26 @@ class BlockStat std::atomic total_bytes; }; +enum class ReadMode +{ + /** + * Read in normal mode. Data is ordered by PK, and only the most recent version is returned. + */ + Normal, + + /** + * Read in fast mode. Data is not sort merged, and all versions are returned. However, deleted records (del_mark=1) + * will be still filtered out. + */ + Fast, + + /** + * Read in raw mode, for example, for statements like `SELRAW *`. In raw mode, data is not sort merged and all versions + * are just returned. + */ + Raw, +}; + class SegmentReadTaskPool : private boost::noncopyable { public: @@ -121,8 +141,7 @@ class SegmentReadTaskPool : private boost::noncopyable const RSOperatorPtr & filter_, uint64_t max_version_, size_t expected_block_size_, - bool is_raw_, - bool do_range_filter_for_raw_, + ReadMode read_mode_, SegmentReadTasks && tasks_, AfterSegmentRead after_segment_read_, const String & tracing_id) @@ -133,8 +152,7 @@ class SegmentReadTaskPool : private boost::noncopyable , filter(filter_) , max_version(max_version_) , expected_block_size(expected_block_size_) - , is_raw(is_raw_) - , do_range_filter_for_raw(do_range_filter_for_raw_) + , read_mode(read_mode_) , tasks(std::move(tasks_)) , after_segment_read(after_segment_read_) , log(Logger::get("SegmentReadTaskPool", tracing_id)) @@ -214,8 +232,7 @@ class SegmentReadTaskPool : private boost::noncopyable RSOperatorPtr filter; const uint64_t max_version; const size_t expected_block_size; - const bool is_raw; - const bool do_range_filter_for_raw; + const ReadMode read_mode; SegmentReadTasks tasks; AfterSegmentRead after_segment_read; std::mutex mutex; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index f17f90d46fc..d6dacd90e5c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -138,7 +138,7 @@ try { // Round 1 { // read written data (only in delta) - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -149,7 +149,7 @@ try { // read written data (only in stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } } @@ -165,7 +165,7 @@ try { // Round 2 { // read written data (both in delta and stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write + num_rows_write_2); } @@ -176,7 +176,7 @@ try { // read written data (only in stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write + num_rows_write_2); } } @@ -206,7 +206,7 @@ try } { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); // only write two visible pks ASSERT_INPUTSTREAM_NROWS(in, 2); } @@ -242,7 +242,7 @@ try { // Round 1 { // read written data (only in delta) - auto in = segment->getInputStream(dmContext(), *tableColumns(), read_ranges); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), read_ranges); ASSERT_INPUTSTREAM_NROWS(in, expect_read_rows); } @@ -255,7 +255,7 @@ try { // read written data (only in stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), read_ranges); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), read_ranges); ASSERT_INPUTSTREAM_NROWS(in, expect_read_rows); } } @@ -273,7 +273,7 @@ try { // Round 2 { // read written data (both in delta and stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), read_ranges); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), read_ranges); ASSERT_INPUTSTREAM_NROWS(in, expect_read_rows_2); } @@ -285,7 +285,7 @@ try { // read written data (only in stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), read_ranges); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), read_ranges); ASSERT_INPUTSTREAM_NROWS(in, expect_read_rows_2); } } @@ -317,19 +317,19 @@ try // Thread A write_rows(100); ASSERT_INPUTSTREAM_NROWS( - segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}), + segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}), 100); auto snap = segment->createSnapshot(dmContext(), false, CurrentMetrics::DT_SnapshotOfRead); // Thread B write_rows(100); ASSERT_INPUTSTREAM_NROWS( - segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}), + segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}), 200); // Thread A { - auto in = segment->getInputStream( + auto in = segment->getInputStreamModeNormal( dmContext(), *tableColumns(), snap, @@ -365,7 +365,7 @@ try } auto get_rows = [&](const RowKeyRange & range) { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {range}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {range}); return getInputStreamNRows(in); }; @@ -435,7 +435,7 @@ try if (read_before_delete) { // read written data - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -456,7 +456,7 @@ try { // read after delete range - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -467,7 +467,7 @@ try if (merge_delta_after_delete) { // read raw after delete range - auto in = segment->getInputStreamRaw(dmContext(), *tableColumns()); + auto in = segment->getInputStreamModeRaw(dmContext(), *tableColumns()); // Only 2 rows are left on disk, others are compacted. ASSERT_INPUTSTREAM_NROWS(in, 2); } @@ -488,7 +488,7 @@ try if (read_before_delete) { // read written data - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -517,7 +517,7 @@ try { // read after delete range - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -528,7 +528,7 @@ try if (merge_delta_after_delete) { // read raw after delete range - auto in = segment->getInputStreamRaw(dmContext(), *tableColumns()); + auto in = segment->getInputStreamModeRaw(dmContext(), *tableColumns()); // Only 2 rows are left on disk, others are compacted. ASSERT_INPUTSTREAM_NROWS(in, 2); } @@ -558,7 +558,7 @@ try if (read_before_delete) { // read written data - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -579,7 +579,7 @@ try { // read after delete range - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -590,7 +590,7 @@ try if (merge_delta_after_delete) { // read raw after delete range - auto in = segment->getInputStreamRaw(dmContext(), *tableColumns()); + auto in = segment->getInputStreamModeRaw(dmContext(), *tableColumns()); // Only 2 rows are left on disk, others are compacted. ASSERT_INPUTSTREAM_NROWS(in, 2); } @@ -633,7 +633,7 @@ try { // Read after deletion // The deleted range has no overlap with current data, so there should be no change - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -653,7 +653,7 @@ try { // Read after deletion // The deleted range has overlap range [63, 64) with current data, so the record with Handle 63 should be deleted - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -671,7 +671,7 @@ try { // Read after deletion - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); std::vector pk_coldata{0}; auto tmp = createNumbers(32, 63); @@ -694,7 +694,7 @@ try { // Read after deletion - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); std::vector pk_coldata{0}; auto tmp = createNumbers(32, 63); pk_coldata.insert(pk_coldata.end(), tmp.begin(), tmp.end()); @@ -716,7 +716,7 @@ try { // Read after deletion - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); std::vector pk_coldata = createNumbers(32, 63); ASSERT_INPUTSTREAM_COLS_UR( in, @@ -734,7 +734,7 @@ try { // Read after new write - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); std::vector pk_coldata = createNumbers(9, 16); auto tmp = createNumbers(32, 63); pk_coldata.insert(pk_coldata.end(), tmp.begin(), tmp.end()); @@ -765,7 +765,7 @@ try { // read written data - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -784,8 +784,8 @@ try EXPECT_EQ(*s2_range.end.value, *old_range.end.value); // TODO check segment epoch is increase - size_t num_rows_seg1 = getInputStreamNRows(segment->getInputStream(dmContext(), *tableColumns(), {segment->getRowKeyRange()})); - size_t num_rows_seg2 = getInputStreamNRows(new_segment->getInputStream(dmContext(), *tableColumns(), {new_segment->getRowKeyRange()})); + size_t num_rows_seg1 = getInputStreamNRows(segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {segment->getRowKeyRange()})); + size_t num_rows_seg2 = getInputStreamNRows(new_segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {new_segment->getRowKeyRange()})); ASSERT_EQ(num_rows_seg1 + num_rows_seg2, num_rows_write); // delete rows in the right segment @@ -805,7 +805,7 @@ try // TODO check segment epoch is increase } { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_seg1); } } @@ -839,8 +839,8 @@ try // If they are equal, result will be true, otherwise it will be false. auto compare = [&](const SegmentPtr & seg1, const SegmentPtr & seg2, bool & result) { result = false; - auto in1 = seg1->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); - auto in2 = seg2->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in1 = seg1->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in2 = seg2->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); in1->readPrefix(); in2->readPrefix(); for (;;) @@ -972,7 +972,7 @@ try { // Read after writing - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); ASSERT_INPUTSTREAM_COLS_UR( in, Strings({DMTestEnv::pk_name}), @@ -1106,7 +1106,7 @@ try }; auto read_rows = [&](const SegmentPtr & segment) { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); return getInputStreamNRows(in); }; @@ -1249,7 +1249,7 @@ try { // read written data - auto in = segment->getInputStream(dmContext(), *columns_to_read, {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *columns_to_read, {RowKeyRange::newAll(false, 1)}); // check that we can read correct values ASSERT_INPUTSTREAM_COLS_UR( in, @@ -1281,7 +1281,7 @@ try { // read written data - auto in = segment->getInputStream(dmContext(), *columns_to_read, {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *columns_to_read, {RowKeyRange::newAll(false, 1)}); // check that we can read correct values // [0, 50) is the old signed values, [50, 100) is replaced by newer written values @@ -1307,7 +1307,7 @@ try { // check the stable data with new schema - auto in = segment->getInputStream(dmContext(), *columns_to_read, {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *columns_to_read, {RowKeyRange::newAll(false, 1)}); // check that we can read correct values std::vector i32_columndata = createSignedNumbers(0, num_rows_write / 2); @@ -1372,7 +1372,7 @@ try { // read written data - auto in = segment->getInputStream(dmContext(), *columns_after_ddl, {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *columns_after_ddl, {RowKeyRange::newAll(false, 1)}); // check that we can read correct values ASSERT_INPUTSTREAM_COLS_UR( @@ -1407,7 +1407,7 @@ try { // read written data - auto in = segment->getInputStream(dmContext(), *columns_after_ddl, {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *columns_after_ddl, {RowKeyRange::newAll(false, 1)}); // check that we can read correct values // First 50 values are default value @@ -1434,7 +1434,7 @@ try { // read written data after delta-merge - auto in = segment->getInputStream(dmContext(), *columns_after_ddl, {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *columns_after_ddl, {RowKeyRange::newAll(false, 1)}); // check that we can read correct values // First 50 values are default value diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp index dfab80df230..864fdb73b39 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp @@ -129,7 +129,7 @@ try { // Round 1 { // read written data (only in delta) - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -140,7 +140,7 @@ try { // read written data (only in stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } } @@ -164,7 +164,7 @@ try { // Round 2 { // read written data (both in delta and stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write + num_rows_write_2); } @@ -175,7 +175,7 @@ try { // read written data (only in stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write + num_rows_write_2); } } @@ -220,7 +220,7 @@ try { // Round 1 { // read written data (only in delta) - auto in = segment->getInputStream(dmContext(), *tableColumns(), read_ranges); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), read_ranges); ASSERT_INPUTSTREAM_NROWS(in, expect_read_rows); } @@ -233,7 +233,7 @@ try { // read written data (only in stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), read_ranges); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), read_ranges); ASSERT_INPUTSTREAM_NROWS(in, expect_read_rows); } } @@ -259,7 +259,7 @@ try { // Round 2 { // read written data (both in delta and stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), read_ranges); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), read_ranges); ASSERT_INPUTSTREAM_NROWS(in, expect_read_rows_2); } @@ -271,7 +271,7 @@ try { // read written data (only in stable) - auto in = segment->getInputStream(dmContext(), *tableColumns(), read_ranges); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), read_ranges); ASSERT_INPUTSTREAM_NROWS(in, expect_read_rows_2); } } @@ -306,7 +306,7 @@ try if (read_before_delete) { // read written data - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -325,7 +325,7 @@ try { // read after delete range - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); const size_t nrows_after_delete = 2; // mock common handle auto common_handle_coldata = [this]() { @@ -367,7 +367,7 @@ try if (read_before_delete) { // read written data - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -394,7 +394,7 @@ try { // read after delete range - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); const size_t nrows_after_delete = 2; // mock common handle auto common_handle_coldata = [this]() { @@ -453,7 +453,7 @@ try if (read_before_delete) { // read written data - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -472,7 +472,7 @@ try { // read after delete range - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); const size_t nrows_after_delete = 2; // mock common handle auto common_handle_coldata = [this]() { @@ -529,7 +529,7 @@ try { // Read after deletion // The deleted range has no overlap with current data, so there should be no change - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); // mock common handle auto common_handle_coldata = [this]() { auto tmp = createNumbers(0, num_rows_write); @@ -557,7 +557,7 @@ try { // Read after deletion // The deleted range has overlap range [63, 64) with current data, so the record with Handle 63 should be deleted - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); // mock common handle auto common_handle_coldata = [this]() { std::vector int_coldata = createNumbers(0, 63); @@ -584,7 +584,7 @@ try { // Read after deletion - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); // mock common handle auto common_handle_coldata = [this]() { // the result should be [0, 32,33,34,...62] @@ -614,7 +614,7 @@ try { // Read after deletion - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); // mock common handle auto common_handle_coldata = [this]() { std::vector int_coldata{0}; @@ -643,7 +643,7 @@ try { // Read after deletion - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); // mock common handle auto common_handle_coldata = [this]() { std::vector int_coldata = createNumbers(32, 63); @@ -681,7 +681,7 @@ try { // read written data - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } @@ -700,8 +700,8 @@ try EXPECT_EQ(*s2_range.end.value, *old_range.end.value); // TODO check segment epoch is increase - size_t num_rows_seg1 = getInputStreamNRows(segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)})); - size_t num_rows_seg2 = getInputStreamNRows(segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)})); + size_t num_rows_seg1 = getInputStreamNRows(segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)})); + size_t num_rows_seg2 = getInputStreamNRows(segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)})); ASSERT_EQ(num_rows_seg1 + num_rows_seg2, num_rows_write); // merge segments @@ -714,7 +714,7 @@ try // TODO check segment epoch is increase } { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_INPUTSTREAM_NROWS(in, num_rows_write); } } @@ -727,8 +727,8 @@ try // If they are equal, result will be true, otherwise it will be false. auto compare = [&](const SegmentPtr & seg1, const SegmentPtr & seg2, bool & result) { result = false; - auto in1 = seg1->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); - auto in2 = seg2->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in1 = seg1->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in2 = seg2->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); in1->readPrefix(); in2->readPrefix(); for (;;) @@ -874,7 +874,7 @@ try { // Read after writing - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + auto in = segment->getInputStreamModeNormal(dmContext(), *tableColumns(), {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); ASSERT_EQ(temp.size(), num_batches_written * (num_rows_per_write - 2)); // mock common handle auto common_handle_coldata = [this, &temp]() { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp index 8a09cc87594..16c6c54d7bf 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp @@ -459,7 +459,7 @@ try writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 41); // 41 in memtable auto segment = segments[DELTA_MERGE_FIRST_SEGMENT_ID]; - auto in_stream = segment->getInputStreamRaw(*dm_context, *tableColumns()); + auto in_stream = segment->getInputStreamModeRaw(*dm_context, *tableColumns()); // Now let's replace data. auto block = prepareWriteBlock(0, 233); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 66e885532dc..2078ee7f2b0 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -64,7 +64,7 @@ size_t SegmentTestBasic::getSegmentRowNumWithoutMVCC(PageId segment_id) { RUNTIME_CHECK(segments.find(segment_id) != segments.end()); auto segment = segments[segment_id]; - auto in = segment->getInputStreamRaw(*dm_context, *tableColumns()); + auto in = segment->getInputStreamModeRaw(*dm_context, *tableColumns()); return getInputStreamNRows(in); } @@ -72,7 +72,7 @@ size_t SegmentTestBasic::getSegmentRowNum(PageId segment_id) { RUNTIME_CHECK(segments.find(segment_id) != segments.end()); auto segment = segments[segment_id]; - auto in = segment->getInputStream(*dm_context, *tableColumns(), {segment->getRowKeyRange()}); + auto in = segment->getInputStreamModeNormal(*dm_context, *tableColumns(), {segment->getRowKeyRange()}); return getInputStreamNRows(in); }