diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 73317911a93..3a4f3caa966 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -935,7 +935,7 @@ std::unordered_map DAGStorageInterpreter::generateSele SelectQueryInfo query_info = create_query_info(physical_table_id); query_info.mvcc_query_info = std::make_unique( mvcc_query_info->resolve_locks, - mvcc_query_info->read_tso, + mvcc_query_info->start_ts, mvcc_query_info->scan_context); ret.emplace(physical_table_id, std::move(query_info)); } diff --git a/dbms/src/Operators/DMSegmentThreadSourceOp.cpp b/dbms/src/Operators/DMSegmentThreadSourceOp.cpp index 46c067d821b..49e6ca0e19c 100644 --- a/dbms/src/Operators/DMSegmentThreadSourceOp.cpp +++ b/dbms/src/Operators/DMSegmentThreadSourceOp.cpp @@ -31,7 +31,7 @@ DMSegmentThreadSourceOp::DMSegmentThreadSourceOp( DM::AfterSegmentRead after_segment_read_, const DM::ColumnDefines & columns_to_read_, const DM::PushDownFilterPtr & filter_, - UInt64 max_version_, + UInt64 start_ts_, size_t expected_block_size_, DM::ReadMode read_mode_, const String & req_id) @@ -41,7 +41,7 @@ DMSegmentThreadSourceOp::DMSegmentThreadSourceOp( , after_segment_read(after_segment_read_) , columns_to_read(columns_to_read_) , filter(filter_) - , max_version(max_version_) + , start_ts(start_ts_) , expected_block_size(expected_block_size_) , read_mode(read_mode_) { @@ -101,7 +101,7 @@ OperatorStatus DMSegmentThreadSourceOp::executeIOImpl() task->read_snapshot, task->ranges, filter, - max_version, + start_ts, block_size); LOG_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo()); } diff --git a/dbms/src/Operators/DMSegmentThreadSourceOp.h b/dbms/src/Operators/DMSegmentThreadSourceOp.h index 3e89c01fcce..7ac9e410201 100644 --- a/dbms/src/Operators/DMSegmentThreadSourceOp.h +++ b/dbms/src/Operators/DMSegmentThreadSourceOp.h @@ -37,7 +37,7 @@ class DMSegmentThreadSourceOp : public SourceOp DM::AfterSegmentRead after_segment_read_, const DM::ColumnDefines & columns_to_read_, const DM::PushDownFilterPtr & filter_, - UInt64 max_version_, + UInt64 start_ts_, size_t expected_block_size_, DM::ReadMode read_mode_, const String & req_id); @@ -59,7 +59,7 @@ class DMSegmentThreadSourceOp : public SourceOp DM::AfterSegmentRead after_segment_read; DM::ColumnDefines columns_to_read; DM::PushDownFilterPtr filter; - const UInt64 max_version; + const UInt64 start_ts; const size_t expected_block_size; const DM::ReadMode read_mode; diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp index 8f38ab119ba..496bd983b1e 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp @@ -288,7 +288,7 @@ void ColumnFileSetReader::getPlaceItems( bool ColumnFileSetReader::shouldPlace( const DMContext & context, const RowKeyRange & relevant_range, - UInt64 max_version, + UInt64 start_ts, size_t placed_rows) { auto & column_files = snapshot->getColumnFiles(); @@ -319,7 +319,7 @@ bool ColumnFileSetReader::shouldPlace( for (auto i = rows_start_in_file; i < rows_end_in_file; ++i) { - if (version_col_data[i] <= max_version && relevant_range.check(rkcc.getRowKeyValue(i))) + if (version_col_data[i] <= start_ts && relevant_range.check(rkcc.getRowKeyValue(i))) return true; } } @@ -334,7 +334,7 @@ bool ColumnFileSetReader::shouldPlace( for (auto i = rows_start_in_file; i < rows_end_in_file; ++i) { - if (version_col_data[i] <= max_version && relevant_range.check(rkcc.getRowKeyValue(i))) + if (version_col_data[i] <= start_ts && relevant_range.check(rkcc.getRowKeyValue(i))) return true; } } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h index c9bb3edeeee..94ee43131d5 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h @@ -82,7 +82,7 @@ class ColumnFileSetReader bool shouldPlace( const DMContext & context, const RowKeyRange & relevant_range, - UInt64 max_version, + UInt64 start_ts, size_t placed_rows); }; diff --git a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h index bafd2f43436..c85cb4ef9cd 100644 --- a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h @@ -45,7 +45,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream AfterSegmentRead after_segment_read_, const ColumnDefines & columns_to_read_, const PushDownFilterPtr & filter_, - UInt64 max_version_, + UInt64 start_ts_, size_t expected_block_size_, ReadMode read_mode_, const String & req_id) @@ -55,7 +55,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream , columns_to_read(columns_to_read_) , filter(filter_) , header(toEmptyBlock(columns_to_read)) - , max_version(max_version_) + , start_ts(start_ts_) , expected_block_size(expected_block_size_) , read_mode(read_mode_) , log(Logger::get(req_id)) @@ -99,7 +99,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream task->read_snapshot, task->ranges, filter, - max_version, + start_ts, block_size); LOG_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo()); } @@ -131,7 +131,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream ColumnDefines columns_to_read; PushDownFilterPtr filter; Block header; - const UInt64 max_version; + const UInt64 start_ts; const size_t expected_block_size; const ReadMode read_mode; size_t total_rows = 0; diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index 526af6f682b..4451ca3389c 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -476,7 +476,7 @@ class DeltaValueReader DeltaIndexPtr my_delta_index, const RowKeyRange & segment_range, const RowKeyRange & relevant_range, - UInt64 max_version); + UInt64 start_ts); }; class DeltaValueInputStream : public SkippableBlockInputStream diff --git a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp index 30543398143..698ce530793 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp @@ -216,7 +216,7 @@ bool DeltaValueReader::shouldPlace( DeltaIndexPtr my_delta_index, const RowKeyRange & segment_range_, const RowKeyRange & relevant_range, - UInt64 max_version) + UInt64 start_ts) { auto [placed_rows, placed_delete_ranges] = my_delta_index->getPlacedStatus(); @@ -230,11 +230,11 @@ bool DeltaValueReader::shouldPlace( return true; size_t rows_in_persisted_file_snap = delta_snap->getMemTableSetRowsOffset(); - return persisted_files_reader->shouldPlace(context, relevant_range, max_version, placed_rows) + return persisted_files_reader->shouldPlace(context, relevant_range, start_ts, placed_rows) || mem_table_reader->shouldPlace( context, relevant_range, - max_version, + start_ts, placed_rows <= rows_in_persisted_file_snap ? 0 : placed_rows - rows_in_persisted_file_snap); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index dcfaa594296..bf7b29d8fb8 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -560,7 +560,7 @@ DM::WriteResult DeltaMergeStore::write( } LOG_DEBUG( log, - "region_id: {}, applied_index: {}, record_count: {}, versions: {}", + "region_id={} applied_index={} record_count={} versions={}", applied_status.region_id, applied_status.applied_index, block.rows(), @@ -1169,7 +1169,7 @@ BlockInputStreams DeltaMergeStore::read( const ColumnDefines & columns_to_read, const RowKeyRanges & sorted_ranges, size_t num_streams, - UInt64 max_version, + UInt64 start_ts, const PushDownFilterPtr & filter, const RuntimeFilteList & runtime_filter_list, int rf_max_wait_time_ms, @@ -1197,15 +1197,6 @@ BlockInputStreams DeltaMergeStore::read( /*try_split_task =*/!enable_read_thread); auto log_tracing_id = getLogTracingId(*dm_context); auto tracing_logger = log->getChild(log_tracing_id); - LOG_INFO( - tracing_logger, - "Read create segment snapshot done, keep_order={} dt_enable_read_thread={} enable_read_thread={} " - "is_fast_scan={} is_push_down_filter_empty={}", - keep_order, - db_context.getSettingsRef().dt_enable_read_thread, - enable_read_thread, - is_fast_scan, - filter == nullptr || filter->before_where == nullptr); auto after_segment_read = [&](const DMContextPtr & dm_context_, const SegmentPtr & segment_) { // TODO: Update the tracing_id before checkSegmentUpdate? @@ -1219,7 +1210,7 @@ BlockInputStreams DeltaMergeStore::read( extra_table_id_index, columns_to_read, filter, - max_version, + start_ts, expected_block_size, read_mode, std::move(tasks), @@ -1252,7 +1243,7 @@ BlockInputStreams DeltaMergeStore::read( after_segment_read, filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read, filter, - max_version, + start_ts, expected_block_size, read_mode, log_tracing_id); @@ -1263,7 +1254,13 @@ BlockInputStreams DeltaMergeStore::read( } LOG_INFO( tracing_logger, - "Read create stream done, pool_id={} num_streams={}", + "Read create stream done, keep_order={} dt_enable_read_thread={} enable_read_thread={} " + "is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={}", + keep_order, + db_context.getSettingsRef().dt_enable_read_thread, + enable_read_thread, + is_fast_scan, + filter == nullptr || filter->before_where == nullptr, read_task_pool->pool_id, final_num_stream); @@ -1278,7 +1275,7 @@ void DeltaMergeStore::read( const ColumnDefines & columns_to_read, const RowKeyRanges & sorted_ranges, size_t num_streams, - UInt64 max_version, + UInt64 start_ts, const PushDownFilterPtr & filter, const RuntimeFilteList & runtime_filter_list, int rf_max_wait_time_ms, @@ -1306,15 +1303,6 @@ void DeltaMergeStore::read( /*try_split_task =*/!enable_read_thread); auto log_tracing_id = getLogTracingId(*dm_context); auto tracing_logger = log->getChild(log_tracing_id); - LOG_INFO( - tracing_logger, - "Read create segment snapshot done, keep_order={} dt_enable_read_thread={} enable_read_thread={} " - "is_fast_scan={} is_push_down_filter_empty={}", - keep_order, - db_context.getSettingsRef().dt_enable_read_thread, - enable_read_thread, - is_fast_scan, - filter == nullptr || filter->before_where == nullptr); auto after_segment_read = [&](const DMContextPtr & dm_context_, const SegmentPtr & segment_) { // TODO: Update the tracing_id before checkSegmentUpdate? @@ -1329,7 +1317,7 @@ void DeltaMergeStore::read( extra_table_id_index, columns_to_read, filter, - max_version, + start_ts, expected_block_size, read_mode, std::move(tasks), @@ -1366,7 +1354,7 @@ void DeltaMergeStore::read( after_segment_read, columns_after_cast, filter, - max_version, + start_ts, expected_block_size, read_mode, log_tracing_id)); @@ -1383,7 +1371,13 @@ void DeltaMergeStore::read( LOG_INFO( tracing_logger, - "Read create PipelineExec done, pool_id={} num_streams={}", + "Read create PipelineExec done, keep_order={} dt_enable_read_thread={} enable_read_thread={} " + "is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={}", + keep_order, + db_context.getSettingsRef().dt_enable_read_thread, + enable_read_thread, + is_fast_scan, + filter == nullptr || filter->before_where == nullptr, read_task_pool->pool_id, final_num_stream); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 6c3931ef7b3..bf8ef39eeb4 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -405,7 +405,7 @@ class DeltaMergeStore : private boost::noncopyable const ColumnDefines & columns_to_read, const RowKeyRanges & sorted_ranges, size_t num_streams, - UInt64 max_version, + UInt64 start_ts, const PushDownFilterPtr & filter, const RuntimeFilteList & runtime_filter_list, int rf_max_wait_time_ms, @@ -430,7 +430,7 @@ class DeltaMergeStore : private boost::noncopyable const ColumnDefines & columns_to_read, const RowKeyRanges & sorted_ranges, size_t num_streams, - UInt64 max_version, + UInt64 start_ts, const PushDownFilterPtr & filter, const RuntimeFilteList & runtime_filter_list, int rf_max_wait_time_ms, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 2f28bcb89c3..d0c7387df80 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -371,10 +371,10 @@ Block DMFileReader::read() if (do_clean_read_on_normal_mode) { - UInt64 max_version = 0; + UInt64 start_ts = 0; for (size_t pack_id = start_pack_id; pack_id < next_pack_id; ++pack_id) - max_version = std::max(pack_filter.getMaxVersion(pack_id), max_version); - do_clean_read_on_normal_mode = max_version <= max_read_version; + start_ts = std::max(pack_filter.getMaxVersion(pack_id), start_ts); + do_clean_read_on_normal_mode = start_ts <= max_read_version; } for (size_t i = 0; i < read_columns.size(); ++i) diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h index 7080c6fae65..b38f598de1a 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h @@ -41,7 +41,7 @@ class RNWorkerPrepareStreams const auto & settings = task->dm_context->global_context.getSettingsRef(); task->initInputStream( *columns_to_read, - read_tso, + start_ts, push_down_filter, read_mode, settings.max_block_size, @@ -53,7 +53,7 @@ class RNWorkerPrepareStreams public: const ColumnDefinesPtr columns_to_read; - const UInt64 read_tso; + const UInt64 start_ts; const PushDownFilterPtr push_down_filter; const ReadMode read_mode; @@ -65,7 +65,7 @@ class RNWorkerPrepareStreams const LoggerPtr & log; const size_t concurrency; const ColumnDefinesPtr & columns_to_read; - const UInt64 read_tso; + const UInt64 start_ts; const PushDownFilterPtr & push_down_filter; const ReadMode read_mode; }; @@ -82,7 +82,7 @@ class RNWorkerPrepareStreams options.log, options.concurrency) , columns_to_read(options.columns_to_read) - , read_tso(options.read_tso) + , start_ts(options.start_ts) , push_down_filter(options.push_down_filter) , read_mode(options.read_mode) {} diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp index 37c3834998c..975dae7db25 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp @@ -60,7 +60,7 @@ RNWorkers::RNWorkers( .log = options.log, .concurrency = prepare_streams_concurrency, .columns_to_read = options.columns_to_read, - .read_tso = options.read_tso, + .start_ts = options.start_ts, .push_down_filter = options.push_down_filter, .read_mode = options.read_mode, }); diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h index 246b3514ae3..69a44b52d81 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h @@ -46,7 +46,7 @@ class RNWorkers : private boost::noncopyable { const LoggerPtr log; const ColumnDefinesPtr & columns_to_read; - const UInt64 read_tso; + const UInt64 start_ts; const PushDownFilterPtr & push_down_filter; const ReadMode read_mode; }; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index b5957a3a058..9bf094e7360 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -803,7 +803,7 @@ BlockInputStreamPtr Segment::getInputStream( const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, const PushDownFilterPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t expected_block_size) { Stopwatch sw; @@ -822,7 +822,7 @@ BlockInputStreamPtr Segment::getInputStream( segment_snap, read_ranges, filter ? filter->rs_operator : EMPTY_RS_OPERATOR, - max_version, + start_ts, clipped_block_rows); case ReadMode::Fast: return getInputStreamModeFast( @@ -846,7 +846,7 @@ BlockInputStreamPtr Segment::getInputStream( segment_snap, read_ranges, filter, - max_version, + start_ts, expected_block_size, clipped_block_rows); default: @@ -869,14 +869,14 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t expected_block_size, bool need_row_id) { LOG_TRACE(segment_snap->log, "Begin segment create input stream"); auto read_tag = need_row_id ? ReadTag::MVCC : ReadTag::Query; - auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, read_ranges, read_tag, max_version); + auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, read_ranges, read_tag, start_ts); RowKeyRanges real_ranges; for (const auto & read_range : read_ranges) @@ -900,7 +900,7 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( *read_info.read_columns, real_ranges, filter, - max_version, + start_ts, expected_block_size, false, read_tag); @@ -914,7 +914,7 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( *read_info.read_columns, real_ranges, filter, - max_version, + start_ts, expected_block_size, true, read_tag); @@ -932,7 +932,7 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( read_info.index_end, expected_block_size, read_tag, - max_version, + start_ts, need_row_id); } @@ -940,15 +940,15 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( stream = std::make_shared>( stream, columns_to_read, - max_version, + start_ts, is_common_handle, dm_context.tracing_id, dm_context.scan_context); LOG_TRACE( segment_snap->log, - "Finish segment create input stream, max_version={} range_size={} ranges={}", - max_version, + "Finish segment create input stream, start_ts={} range_size={} ranges={}", + start_ts, real_ranges.size(), DB::DM::toDebugString(read_ranges)); return stream; @@ -959,7 +959,7 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( const ColumnDefines & columns_to_read, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t expected_block_size) { auto segment_snap = createSnapshot(dm_context, false, CurrentMetrics::DT_SnapshotOfRead); @@ -971,7 +971,7 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal( segment_snap, read_ranges, filter, - max_version, + start_ts, expected_block_size); } @@ -2399,7 +2399,7 @@ Segment::ReadInfo Segment::getReadInfo( const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, ReadTag read_tag, - UInt64 max_version) const + UInt64 start_ts) const { LOG_DEBUG(segment_snap->log, "Begin segment getReadInfo"); @@ -2415,8 +2415,7 @@ Segment::ReadInfo Segment::getReadInfo( this->rowkey_range, ReadTag::MVCC); - auto [my_delta_index, fully_indexed] - = ensurePlace(dm_context, segment_snap, delta_reader, read_ranges, max_version); + auto [my_delta_index, fully_indexed] = ensurePlace(dm_context, segment_snap, delta_reader, read_ranges, start_ts); auto compacted_index = my_delta_index->getDeltaTree()->getCompactedEntries(); @@ -2479,7 +2478,7 @@ SkippableBlockInputStreamPtr Segment::getPlacedStream( const IndexIterator & delta_index_end, size_t expected_block_size, ReadTag read_tag, - UInt64 max_version, + UInt64 start_ts, bool need_row_id) { if (unlikely(rowkey_ranges.empty())) @@ -2490,7 +2489,7 @@ SkippableBlockInputStreamPtr Segment::getPlacedStream( read_columns, rowkey_ranges, filter, - max_version, + start_ts, expected_block_size, /* enable_handle_clean_read */ false, read_tag, @@ -2532,7 +2531,7 @@ std::pair Segment::ensurePlace( const SegmentSnapshotPtr & segment_snap, const DeltaValueReaderPtr & delta_reader, const RowKeyRanges & read_ranges, - UInt64 max_version) const + UInt64 start_ts) const { const auto & stable_snap = segment_snap->stable; auto delta_snap = delta_reader->getDeltaSnap(); @@ -2552,7 +2551,7 @@ std::pair Segment::ensurePlace( auto [my_placed_rows, my_placed_deletes] = my_delta_index->getPlacedStatus(); // Let's do a fast check, determine whether we need to do place or not. - if (!delta_reader->shouldPlace(dm_context, my_delta_index, rowkey_range, relevant_range, max_version)) + if (!delta_reader->shouldPlace(dm_context, my_delta_index, rowkey_range, relevant_range, start_ts)) return {my_delta_index, false}; CurrentMetrics::Increment cur_dm_segments{CurrentMetrics::DT_PlaceIndexUpdate}; @@ -2798,7 +2797,7 @@ BitmapFilterPtr Segment::buildBitmapFilter( const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t expected_block_size) { RUNTIME_CHECK_MSG(!dm_context.read_delta_only, "Read delta only is unsupported"); @@ -2809,12 +2808,12 @@ BitmapFilterPtr Segment::buildBitmapFilter( segment_snap, read_ranges, filter, - max_version, + start_ts, expected_block_size); } else { - return buildBitmapFilterNormal(dm_context, segment_snap, read_ranges, filter, max_version, expected_block_size); + return buildBitmapFilterNormal(dm_context, segment_snap, read_ranges, filter, start_ts, expected_block_size); } } @@ -2823,7 +2822,7 @@ BitmapFilterPtr Segment::buildBitmapFilterNormal( const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t expected_block_size) { Stopwatch sw_total; @@ -2837,7 +2836,7 @@ BitmapFilterPtr Segment::buildBitmapFilterNormal( segment_snap, read_ranges, filter, - max_version, + start_ts, expected_block_size, /*need_row_id*/ true); // `total_rows` is the rows read for building bitmap @@ -2875,7 +2874,7 @@ std::pair, std::vector> parseDMFilePackInfo( const DMContext & dm_context, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, - UInt64 max_version) + UInt64 start_ts) { // Packs that all rows compliant with MVCC filter and RowKey filter requirements. // For building bitmap filter, we don't need to read these packs, @@ -2922,7 +2921,7 @@ std::pair, std::vector> parseDMFilePackInfo( } if (handle_res[pack_id] == RSResult::Some || pack_stat.not_clean > 0 - || pack_filter.getMaxVersion(pack_id) > max_version) + || pack_filter.getMaxVersion(pack_id) > start_ts) { // We need to read this pack to do RowKey or MVCC filter. some_packs_set->insert(pack_id); @@ -2958,7 +2957,7 @@ BitmapFilterPtr Segment::buildBitmapFilterStableOnly( const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t expected_block_size) { Stopwatch sw; @@ -2971,7 +2970,7 @@ BitmapFilterPtr Segment::buildBitmapFilterStableOnly( return elapse_ns / 1'000'000.0; }; - auto [skipped_ranges, some_packs_sets] = parseDMFilePackInfo(dmfiles, dm_context, read_ranges, filter, max_version); + auto [skipped_ranges, some_packs_sets] = parseDMFilePackInfo(dmfiles, dm_context, read_ranges, filter, start_ts); if (skipped_ranges.size() == 1 && skipped_ranges[0].offset == 0 && skipped_ranges[0].rows == segment_snap->stable->getDMFilesRows()) @@ -3022,7 +3021,7 @@ BitmapFilterPtr Segment::buildBitmapFilterStableOnly( columns_to_read, read_ranges, filter, - max_version, + start_ts, expected_block_size, /*enable_handle_clean_read*/ false, ReadTag::MVCC, @@ -3037,7 +3036,7 @@ BitmapFilterPtr Segment::buildBitmapFilterStableOnly( stream = std::make_shared>( stream, read_columns, - max_version, + start_ts, is_common_handle, dm_context.tracing_id); bitmap_filter->set(stream); @@ -3059,7 +3058,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( const ColumnDefines & columns_to_read, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t expected_block_size) { // set `is_fast_scan` to true to try to enable clean read @@ -3072,7 +3071,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( columns_to_read, read_ranges, filter, - max_version, + start_ts, expected_block_size, enable_handle_clean_read, ReadTag::Query, @@ -3103,7 +3102,7 @@ BlockInputStreamPtr Segment::getLateMaterializationStream( const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & data_ranges, const PushDownFilterPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t expected_block_size) { // set `is_fast_scan` to true to try to enable clean read @@ -3118,7 +3117,7 @@ BlockInputStreamPtr Segment::getLateMaterializationStream( *filter_columns, data_ranges, filter->rs_operator, - max_version, + start_ts, expected_block_size, enable_handle_clean_read, ReadTag::LMFilter, @@ -3205,7 +3204,7 @@ BlockInputStreamPtr Segment::getLateMaterializationStream( *rest_columns_to_read, data_ranges, filter->rs_operator, - max_version, + start_ts, expected_block_size, enable_handle_clean_read, ReadTag::Query, @@ -3257,7 +3256,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, const PushDownFilterPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t build_bitmap_filter_block_rows, size_t read_data_block_rows) { @@ -3272,7 +3271,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( segment_snap, real_ranges, filter ? filter->rs_operator : EMPTY_RS_OPERATOR, - max_version, + start_ts, build_bitmap_filter_block_rows); // If we don't need to read the cacheable columns, release column cache as soon as possible. @@ -3291,7 +3290,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( segment_snap, real_ranges, filter, - max_version, + start_ts, read_data_block_rows); } @@ -3302,7 +3301,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream( columns_to_read, real_ranges, filter ? filter->rs_operator : EMPTY_RS_OPERATOR, - max_version, + start_ts, read_data_block_rows); } diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 35155b11e86..9add3a6f9fc 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -214,7 +214,7 @@ class Segment const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, const PushDownFilterPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t expected_block_size); BlockInputStreamPtr getInputStreamModeNormal( @@ -223,7 +223,7 @@ class Segment const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t expected_block_size, bool need_row_id = false); @@ -232,7 +232,7 @@ class Segment const ColumnDefines & columns_to_read, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter = {}, - UInt64 max_version = std::numeric_limits::max(), + UInt64 start_ts = std::numeric_limits::max(), size_t expected_block_size = DEFAULT_BLOCK_SIZE); /** @@ -609,7 +609,7 @@ class Segment const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, ReadTag read_tag, - UInt64 max_version = std::numeric_limits::max()) const; + UInt64 start_ts = std::numeric_limits::max()) const; static ColumnDefinesPtr arrangeReadColumns(const ColumnDefine & handle, const ColumnDefines & columns_to_read); @@ -626,7 +626,7 @@ class Segment const IndexIterator & delta_index_end, size_t expected_block_size, ReadTag read_tag, - UInt64 max_version = std::numeric_limits::max(), + UInt64 start_ts = std::numeric_limits::max(), bool need_row_id = false); /// Make sure that all delta packs have been placed. @@ -637,7 +637,7 @@ class Segment const SegmentSnapshotPtr & segment_snap, const DeltaValueReaderPtr & delta_reader, const RowKeyRanges & read_ranges, - UInt64 max_version) const; + UInt64 start_ts) const; /// Reference the inserts/updates by delta tree. /// Returns fully placed or not. Some rows not match relevant_range are not placed. @@ -670,21 +670,21 @@ class Segment const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t expected_block_size); BitmapFilterPtr buildBitmapFilterNormal( const DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t expected_block_size); BitmapFilterPtr buildBitmapFilterStableOnly( const DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t expected_block_size); BlockInputStreamPtr getBitmapFilterInputStream( BitmapFilterPtr && bitmap_filter, @@ -693,7 +693,7 @@ class Segment const ColumnDefines & columns_to_read, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t expected_block_size); BlockInputStreamPtr getBitmapFilterInputStream( const DMContext & dm_context, @@ -701,7 +701,7 @@ class Segment const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, const PushDownFilterPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t build_bitmap_filter_block_rows, size_t read_data_block_rows); @@ -712,7 +712,7 @@ class Segment const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & data_ranges, const PushDownFilterPtr & filter, - UInt64 max_version, + UInt64 start_ts, size_t expected_block_size); // clipBlockRows try to limit the block size not exceed settings.max_block_bytes. diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp index a272f4442a4..a5467a579df 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp @@ -242,7 +242,7 @@ void SegmentReadTask::initColumnFileDataProvider(const Remote::RNLocalPageCacheG void SegmentReadTask::initInputStream( const ColumnDefines & columns_to_read, - UInt64 read_tso, + UInt64 start_ts, const PushDownFilterPtr & push_down_filter, ReadMode read_mode, size_t expected_block_size, @@ -250,7 +250,7 @@ void SegmentReadTask::initInputStream( { if (likely(doInitInputStreamWithErrorFallback( columns_to_read, - read_tso, + start_ts, push_down_filter, read_mode, expected_block_size, @@ -266,12 +266,12 @@ void SegmentReadTask::initInputStream( { cache->setDeltaIndex(read_snapshot->delta->getSharedDeltaIndex()); } - doInitInputStream(columns_to_read, read_tso, push_down_filter, read_mode, expected_block_size); + doInitInputStream(columns_to_read, start_ts, push_down_filter, read_mode, expected_block_size); } bool SegmentReadTask::doInitInputStreamWithErrorFallback( const ColumnDefines & columns_to_read, - UInt64 read_tso, + UInt64 start_ts, const PushDownFilterPtr & push_down_filter, ReadMode read_mode, size_t expected_block_size, @@ -279,7 +279,7 @@ bool SegmentReadTask::doInitInputStreamWithErrorFallback( { try { - doInitInputStream(columns_to_read, read_tso, push_down_filter, read_mode, expected_block_size); + doInitInputStream(columns_to_read, start_ts, push_down_filter, read_mode, expected_block_size); return true; } catch (const Exception & e) @@ -298,7 +298,7 @@ bool SegmentReadTask::doInitInputStreamWithErrorFallback( void SegmentReadTask::doInitInputStream( const ColumnDefines & columns_to_read, - UInt64 read_tso, + UInt64 start_ts, const PushDownFilterPtr & push_down_filter, ReadMode read_mode, size_t expected_block_size) @@ -317,7 +317,7 @@ void SegmentReadTask::doInitInputStream( read_snapshot, ranges, push_down_filter, - read_tso, + start_ts, expected_block_size); } diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h index ae6ca6543a2..a91c9930e83 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h @@ -94,7 +94,7 @@ struct SegmentReadTask void initInputStream( const ColumnDefines & columns_to_read, - UInt64 read_tso, + UInt64 start_ts, const PushDownFilterPtr & push_down_filter, ReadMode read_mode, size_t expected_block_size, @@ -132,7 +132,7 @@ struct SegmentReadTask bool doInitInputStreamWithErrorFallback( const ColumnDefines & columns_to_read, - UInt64 read_tso, + UInt64 start_ts, const PushDownFilterPtr & push_down_filter, ReadMode read_mode, size_t expected_block_size, @@ -140,7 +140,7 @@ struct SegmentReadTask void doInitInputStream( const ColumnDefines & columns_to_read, - UInt64 read_tso, + UInt64 start_ts, const PushDownFilterPtr & push_down_filter, ReadMode read_mode, size_t expected_block_size); diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index 711cb5ee335..b9bd5facae1 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -96,7 +96,7 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t } t->initInputStream( columns_to_read, - max_version, + start_ts, filter, read_mode, expected_block_size, @@ -118,7 +118,7 @@ SegmentReadTaskPool::SegmentReadTaskPool( int extra_table_id_index_, const ColumnDefines & columns_to_read_, const PushDownFilterPtr & filter_, - uint64_t max_version_, + uint64_t start_ts_, size_t expected_block_size_, ReadMode read_mode_, SegmentReadTasks && tasks_, @@ -132,7 +132,7 @@ SegmentReadTaskPool::SegmentReadTaskPool( , extra_table_id_index(extra_table_id_index_) , columns_to_read(columns_to_read_) , filter(filter_) - , max_version(max_version_) + , start_ts(start_ts_) , expected_block_size(expected_block_size_) , read_mode(read_mode_) , tasks_wrapper(enable_read_thread_, std::move(tasks_)) diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index dfcddc1eed4..1ae455f17f7 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -107,7 +107,7 @@ class SegmentReadTaskPool int extra_table_id_index_, const ColumnDefines & columns_to_read_, const PushDownFilterPtr & filter_, - uint64_t max_version_, + uint64_t start_ts_, size_t expected_block_size_, ReadMode read_mode_, SegmentReadTasks && tasks_, @@ -117,7 +117,7 @@ class SegmentReadTaskPool Int64 num_streams_, const String & res_group_name_); - ~SegmentReadTaskPool() + ~SegmentReadTaskPool() override { auto [pop_times, pop_empty_times, max_queue_size] = q.getStat(); auto pop_empty_ratio = pop_times > 0 ? pop_empty_times * 1.0 / pop_times : 0.0; @@ -126,7 +126,7 @@ class SegmentReadTaskPool auto blk_avg_bytes = total_count > 0 ? total_bytes / total_count : 0; auto approximate_max_pending_block_bytes = blk_avg_bytes * max_queue_size; auto total_rows = blk_stat.totalRows(); - LOG_DEBUG( + LOG_INFO( log, "Done. pool_id={} pop={} pop_empty={} pop_empty_ratio={} " "max_queue_size={} blk_avg_bytes={} approximate_max_pending_block_bytes={:.2f}MB " @@ -214,7 +214,7 @@ class SegmentReadTaskPool const int extra_table_id_index; ColumnDefines columns_to_read; PushDownFilterPtr filter; - const uint64_t max_version; + const uint64_t start_ts; const size_t expected_block_size; const ReadMode read_mode; SegmentReadTasksWrapper tasks_wrapper; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 30df27c2826..731819e4228 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -289,7 +289,7 @@ try *new_cols, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -476,7 +476,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -564,7 +564,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -592,7 +592,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), std::make_shared(filter), std::vector{}, 0, @@ -682,7 +682,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -767,7 +767,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -820,7 +820,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -850,7 +850,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -931,7 +931,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -1009,7 +1009,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -1033,7 +1033,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ static_cast(1), + /* start_ts= */ static_cast(1), EMPTY_FILTER, std::vector{}, 0, @@ -1081,7 +1081,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -1114,7 +1114,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -1145,7 +1145,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -1186,7 +1186,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -1208,7 +1208,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ tso2, + /* start_ts= */ tso2, EMPTY_FILTER, std::vector{}, 0, @@ -1230,7 +1230,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ tso1, + /* start_ts= */ tso1, EMPTY_FILTER, std::vector{}, 0, @@ -1252,7 +1252,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ tso1 - 1, + /* start_ts= */ tso1 - 1, EMPTY_FILTER, std::vector{}, 0, @@ -1312,7 +1312,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ tso1, + /* start_ts= */ tso1, EMPTY_FILTER, std::vector{}, 0, @@ -1341,7 +1341,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ tso2 - 1, + /* start_ts= */ tso2 - 1, EMPTY_FILTER, std::vector{}, 0, @@ -1372,7 +1372,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ tso3 - 1, + /* start_ts= */ tso3 - 1, EMPTY_FILTER, std::vector{}, 0, @@ -1395,7 +1395,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -1419,7 +1419,7 @@ try columns, {range0, range1}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -1480,7 +1480,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ tso1, + /* start_ts= */ tso1, EMPTY_FILTER, std::vector{}, 0, @@ -1509,7 +1509,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ tso2 - 1, + /* start_ts= */ tso2 - 1, EMPTY_FILTER, std::vector{}, 0, @@ -1540,7 +1540,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -1585,7 +1585,7 @@ try store->getTableColumns(), {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, /* rf_max_wait_time_ms= */ 0, @@ -1816,7 +1816,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -1904,7 +1904,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -1990,7 +1990,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -2061,7 +2061,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -2130,7 +2130,7 @@ try store->getTableColumns(), {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -2181,7 +2181,7 @@ try store->getTableColumns(), {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -2231,7 +2231,7 @@ try store->getTableColumns(), {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -2281,7 +2281,7 @@ try store->getTableColumns(), {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -2331,7 +2331,7 @@ try store->getTableColumns(), {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -2387,7 +2387,7 @@ try store->getTableColumns(), {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -2463,7 +2463,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -2571,7 +2571,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -2622,7 +2622,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -2695,7 +2695,7 @@ try store->getTableColumns(), {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -2743,7 +2743,7 @@ try store->getTableColumns(), {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -2858,7 +2858,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -2971,7 +2971,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -3051,7 +3051,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -3085,7 +3085,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ static_cast(1), + /* start_ts= */ static_cast(1), EMPTY_FILTER, std::vector{}, 0, @@ -3146,7 +3146,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -3188,7 +3188,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -3259,7 +3259,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp index 9f01e8151ed..54c1a84602a 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp @@ -252,7 +252,7 @@ class DeltaMergeStoreTestFastAddPeer columns, {range}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_scan.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_scan.cpp index d3018254c55..3a37dee9bdc 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_scan.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_scan.cpp @@ -84,7 +84,7 @@ TEST_P(DeltaMergeStoreRWTest, TestFastScanWithOnlyInsertWithoutRangeFilter) columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -171,7 +171,7 @@ TEST_P(DeltaMergeStoreRWTest, TestFastScanWithOnlyInsertWithRangeFilter) columns, key_ranges, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -252,7 +252,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -388,7 +388,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -503,7 +503,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -621,7 +621,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -706,7 +706,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -879,7 +879,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -908,7 +908,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -957,7 +957,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -985,7 +985,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -1048,7 +1048,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -1136,7 +1136,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -1231,7 +1231,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ static_cast(1), + /* start_ts= */ static_cast(1), EMPTY_FILTER, std::vector{}, 0, @@ -1288,7 +1288,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -1335,7 +1335,7 @@ try real_columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp index dcd3b410836..89a35793ebe 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_ingest.cpp @@ -249,7 +249,7 @@ try store->getTableColumns(), {RowKeyRange::newAll(is_common_handle, 1)}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp index fc8b78f9f81..74b01176a47 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp @@ -335,7 +335,7 @@ size_t SimplePKTestBasic::getRowsN() const store->getTableColumns(), {RowKeyRange::newAll(is_common_handle, 1)}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -354,7 +354,7 @@ size_t SimplePKTestBasic::getRowsN(Int64 start_key, Int64 end_key) const store->getTableColumns(), {buildRowRange(start_key, end_key)}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_reader.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_reader.cpp index d60fbda11ea..ccc5d0db469 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_reader.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_reader.cpp @@ -88,7 +88,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -102,7 +102,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, @@ -211,7 +211,7 @@ try columns, {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), EMPTY_FILTER, std::vector{}, 0, diff --git a/dbms/src/Storages/KVStore/Read/LearnerRead.cpp b/dbms/src/Storages/KVStore/Read/LearnerRead.cpp index 619a838de4e..9ff6b04a262 100644 --- a/dbms/src/Storages/KVStore/Read/LearnerRead.cpp +++ b/dbms/src/Storages/KVStore/Read/LearnerRead.cpp @@ -121,9 +121,9 @@ void validateQueryInfo( } } -MvccQueryInfo::MvccQueryInfo(bool resolve_locks_, UInt64 read_tso_, DM::ScanContextPtr scan_ctx) - : read_tso(read_tso_) - , resolve_locks(read_tso_ == std::numeric_limits::max() ? false : resolve_locks_) +MvccQueryInfo::MvccQueryInfo(bool resolve_locks_, UInt64 start_ts_, DM::ScanContextPtr scan_ctx) + : start_ts(start_ts_) + , resolve_locks(start_ts_ == std::numeric_limits::max() ? false : resolve_locks_) , scan_context(std::move(scan_ctx)) { // using `std::numeric_limits::max()` to resolve lock may break basic logic. diff --git a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp index 0b17fd156aa..cad8876e2ea 100644 --- a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp +++ b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp @@ -115,9 +115,8 @@ std::vector LearnerReadWorker::buildBatchReadIndexReq batch_read_index_req.reserve(regions_info.size()); // If using `std::numeric_limits::max()`, set `start-ts` 0 to get the latest index but let read-index-worker do not record as history. - auto read_index_tso - = mvcc_query_info.read_tso == std::numeric_limits::max() ? 0 : mvcc_query_info.read_tso; - if (read_index_tso == 0) + auto start_ts = mvcc_query_info.start_ts == std::numeric_limits::max() ? 0 : mvcc_query_info.start_ts; + if (start_ts == 0) { GET_METRIC(tiflash_raft_read_index_events_count, type_zero_read_tso).Increment(); } @@ -125,8 +124,8 @@ std::vector LearnerReadWorker::buildBatchReadIndexReq { const RegionID region_id = region_to_query.region_id; // don't stale read in test scenarios. - bool can_stale_read = mvcc_query_info.read_tso != std::numeric_limits::max() - && read_index_tso <= region_table.getSelfSafeTS(region_id); + bool can_stale_read = mvcc_query_info.start_ts != std::numeric_limits::max() + && start_ts <= region_table.getSelfSafeTS(region_id); if (can_stale_read) { batch_read_index_result.emplace(region_id, kvrpcpb::ReadIndexResponse()); @@ -147,7 +146,7 @@ std::vector LearnerReadWorker::buildBatchReadIndexReq { // generate request for read index const auto & region = regions_snapshot.find(region_id)->second; - batch_read_index_req.emplace_back(GenRegionReadIndexReq(*region, read_index_tso)); + batch_read_index_req.emplace_back(GenRegionReadIndexReq(*region, start_ts)); ++stats.num_read_index_request; } } @@ -328,15 +327,14 @@ RegionsReadIndexResult LearnerReadWorker::readIndex( log, log_lvl, "[Learner Read] Batch read index, num_regions={} num_requests={} num_stale_read={} num_cached_index={} " - "num_unavailable={} " - "cost={}ms, read_tso={}", + "num_unavailable={} cost={}ms, start_ts={}", stats.num_regions, stats.num_read_index_request, stats.num_stale_read, stats.num_cached_read_index, unavailable_regions.size(), stats.read_index_elapsed_ms, - mvcc_query_info.read_tso); + mvcc_query_info.start_ts); return batch_read_index_result; } @@ -401,7 +399,7 @@ void LearnerReadWorker::waitIndex( tmt, physical_table_id, region, - mvcc_query_info.read_tso, + mvcc_query_info.start_ts, region_to_query.bypass_lock_ts, region_to_query.version, region_to_query.conf_version, @@ -432,11 +430,11 @@ void LearnerReadWorker::waitIndex( LOG_IMPL( log, log_lvl, - "[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}, read_tso={}", + "[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}, start_ts={}", stats.wait_index_elapsed_ms, stats.num_regions, unavailable_regions.size(), - mvcc_query_info.read_tso); + mvcc_query_info.start_ts); auto bypass_formatter = [&](const RegionQueryInfo & query_info) -> String { if (query_info.bypass_lock_ts == nullptr) @@ -470,10 +468,10 @@ void LearnerReadWorker::waitIndex( LOG_DEBUG( log, - "[Learner Read] Learner Read Summary, regions_info={}, unavailable_regions_info={}, read_tso={}", + "[Learner Read] Learner Read Summary, regions_info={}, unavailable_regions_info={}, start_ts={}", region_info_formatter(), unavailable_regions.toDebugString(), - mvcc_query_info.read_tso); + mvcc_query_info.start_ts); } std::tuple // @@ -512,14 +510,14 @@ LearnerReadWorker::waitUntilDataAvailable( log, log_lvl, "[Learner Read] batch read index | wait index" - " total_cost={} read_cost={} wait_cost={} n_regions={} n_stale_read={} n_unavailable={}, read_tso={}", + " total_cost={} read_cost={} wait_cost={} n_regions={} n_stale_read={} n_unavailable={}, start_ts={}", time_elapsed_ms, stats.read_index_elapsed_ms, stats.wait_index_elapsed_ms, stats.num_regions, stats.num_stale_read, unavailable_regions.size(), - mvcc_query_info.read_tso); + mvcc_query_info.start_ts); return {start_time, end_time}; } diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index 16426df352f..01c08bb3414 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -336,7 +336,7 @@ void verifyRows(Context & ctx, DM::DeltaMergeStorePtr store, const DM::RowKeyRan columns, {range}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), DM::EMPTY_FILTER, std::vector{}, 0, diff --git a/dbms/src/Storages/RegionQueryInfo.h b/dbms/src/Storages/RegionQueryInfo.h index 0d08272b088..66f74f38d71 100644 --- a/dbms/src/Storages/RegionQueryInfo.h +++ b/dbms/src/Storages/RegionQueryInfo.h @@ -63,7 +63,7 @@ struct RegionQueryInfo struct MvccQueryInfo { - const UInt64 read_tso; + const UInt64 start_ts; const bool resolve_locks; @@ -77,7 +77,7 @@ struct MvccQueryInfo DM::ScanContextPtr scan_context; public: - explicit MvccQueryInfo(bool resolve_locks_ = false, UInt64 read_tso_ = 0, DM::ScanContextPtr scan_ctx = nullptr); + explicit MvccQueryInfo(bool resolve_locks_ = false, UInt64 start_ts_ = 0, DM::ScanContextPtr scan_ctx = nullptr); void addReadIndexResToCache(RegionID region_id, UInt64 read_index) { read_index_res_cache[region_id] = read_index; } UInt64 getReadIndexRes(RegionID region_id) const diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index b91f0dc2832..5e017055270 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -646,7 +646,7 @@ void setColumnsToRead( } // Check whether tso is smaller than TiDB GcSafePoint -void checkReadTso(UInt64 read_tso, const Context & context, const String & req_id, KeyspaceID keyspace_id) +void checkStartTs(UInt64 start_ts, const Context & context, const String & req_id, KeyspaceID keyspace_id) { auto & tmt = context.getTMTContext(); RUNTIME_CHECK(tmt.isInitialized()); @@ -658,12 +658,12 @@ void checkReadTso(UInt64 read_tso, const Context & context, const String & req_i keyspace_id, /* ignore_cache= */ false, context.getSettingsRef().safe_point_update_interval_seconds); - if (read_tso < safe_point) + if (start_ts < safe_point) { throw TiFlashException( Errors::Coprocessor::BadRequest, - "read tso is smaller than tidb gc safe point! read_tso={} safepoint={} req={}", - read_tso, + "read tso is smaller than tidb gc safe point! start_ts={} safepoint={} req={}", + start_ts, safe_point, req_id); } @@ -676,10 +676,8 @@ DM::RowKeyRanges StorageDeltaMerge::parseMvccQueryInfo( const String & req_id, const LoggerPtr & tracing_logger) { - LOG_DEBUG(tracing_logger, "Read with tso: {}", mvcc_query_info.read_tso); - auto keyspace_id = getTableInfo().getKeyspaceID(); - checkReadTso(mvcc_query_info.read_tso, context, req_id, keyspace_id); + checkStartTs(mvcc_query_info.start_ts, context, req_id, keyspace_id); FmtBuffer fmt_buf; if (unlikely(tracing_logger->is(Poco::Message::Priority::PRIO_TRACE))) @@ -982,7 +980,7 @@ BlockInputStreams StorageDeltaMerge::read( columns_to_read, ranges, num_streams, - /*max_version=*/mvcc_query_info.read_tso, + /*start_ts=*/mvcc_query_info.start_ts, filter, runtime_filter_list, query_info.dag_query == nullptr ? 0 : query_info.dag_query->rf_max_wait_time_ms, @@ -995,8 +993,8 @@ BlockInputStreams StorageDeltaMerge::read( scan_context); auto keyspace_id = getTableInfo().getKeyspaceID(); - /// Ensure read_tso info after read. - checkReadTso(mvcc_query_info.read_tso, context, query_info.req_id, keyspace_id); + /// Ensure start_ts info after read. + checkStartTs(mvcc_query_info.start_ts, context, query_info.req_id, keyspace_id); LOG_TRACE(tracing_logger, "[ranges: {}] [streams: {}]", ranges.size(), streams.size()); @@ -1073,7 +1071,7 @@ void StorageDeltaMerge::read( columns_to_read, ranges, num_streams, - /*max_version=*/mvcc_query_info.read_tso, + /*start_ts=*/mvcc_query_info.start_ts, filter, runtime_filter_list, query_info.dag_query == nullptr ? 0 : query_info.dag_query->rf_max_wait_time_ms, @@ -1086,8 +1084,8 @@ void StorageDeltaMerge::read( scan_context); auto keyspace_id = getTableInfo().getKeyspaceID(); - /// Ensure read_tso info after read. - checkReadTso(mvcc_query_info.read_tso, context, query_info.req_id, keyspace_id); + /// Ensure start_ts info after read. + checkStartTs(mvcc_query_info.start_ts, context, query_info.req_id, keyspace_id); LOG_TRACE(tracing_logger, "[ranges: {}] [concurrency: {}]", ranges.size(), group_builder.concurrency()); } @@ -1123,8 +1121,8 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr StorageDeltaMerge::writeNodeBuild snap->column_defines = std::make_shared(columns_to_read); auto keyspace_id = getTableInfo().getKeyspaceID(); - // Ensure read_tso is valid after snapshot is built - checkReadTso(mvcc_query_info.read_tso, context, query_info.req_id, keyspace_id); + // Ensure start_ts is valid after snapshot is built + checkStartTs(mvcc_query_info.start_ts, context, query_info.req_id, keyspace_id); return snap; } diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index f3d7b67c253..8346e4a9a1e 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -538,7 +538,7 @@ std::variant StorageDisagg table_scan.isFastScan(), table_scan.keepOrder(), push_down_filter); - const UInt64 read_tso = sender_target_mpp_task_id.gather_id.query_id.start_ts; + const UInt64 start_ts = sender_target_mpp_task_id.gather_id.query_id.start_ts; const auto enable_read_thread = db_context.getSettingsRef().dt_enable_read_thread; LOG_INFO( log, @@ -558,7 +558,7 @@ std::variant StorageDisagg extra_table_id_index, *column_defines, push_down_filter, - read_tso, + start_ts, db_context.getSettingsRef().max_block_size, read_mode, std::move(read_tasks), @@ -576,7 +576,7 @@ std::variant StorageDisagg { .log = log->getChild(executor_id), .columns_to_read = column_defines, - .read_tso = read_tso, + .start_ts = start_ts, .push_down_filter = push_down_filter, .read_mode = read_mode, },