Skip to content

Commit

Permalink
Storages: Refine log (#8963)
Browse files Browse the repository at this point in the history
close #8864
  • Loading branch information
JinheLin authored Apr 18, 2024
1 parent 5cba983 commit 4da9e96
Show file tree
Hide file tree
Showing 32 changed files with 226 additions and 237 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
SelectQueryInfo query_info = create_query_info(physical_table_id);
query_info.mvcc_query_info = std::make_unique<MvccQueryInfo>(
mvcc_query_info->resolve_locks,
mvcc_query_info->read_tso,
mvcc_query_info->start_ts,
mvcc_query_info->scan_context);
ret.emplace(physical_table_id, std::move(query_info));
}
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Operators/DMSegmentThreadSourceOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ DMSegmentThreadSourceOp::DMSegmentThreadSourceOp(
DM::AfterSegmentRead after_segment_read_,
const DM::ColumnDefines & columns_to_read_,
const DM::PushDownFilterPtr & filter_,
UInt64 max_version_,
UInt64 start_ts_,
size_t expected_block_size_,
DM::ReadMode read_mode_,
const String & req_id)
Expand All @@ -41,7 +41,7 @@ DMSegmentThreadSourceOp::DMSegmentThreadSourceOp(
, after_segment_read(after_segment_read_)
, columns_to_read(columns_to_read_)
, filter(filter_)
, max_version(max_version_)
, start_ts(start_ts_)
, expected_block_size(expected_block_size_)
, read_mode(read_mode_)
{
Expand Down Expand Up @@ -101,7 +101,7 @@ OperatorStatus DMSegmentThreadSourceOp::executeIOImpl()
task->read_snapshot,
task->ranges,
filter,
max_version,
start_ts,
block_size);
LOG_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo());
}
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Operators/DMSegmentThreadSourceOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DMSegmentThreadSourceOp : public SourceOp
DM::AfterSegmentRead after_segment_read_,
const DM::ColumnDefines & columns_to_read_,
const DM::PushDownFilterPtr & filter_,
UInt64 max_version_,
UInt64 start_ts_,
size_t expected_block_size_,
DM::ReadMode read_mode_,
const String & req_id);
Expand All @@ -59,7 +59,7 @@ class DMSegmentThreadSourceOp : public SourceOp
DM::AfterSegmentRead after_segment_read;
DM::ColumnDefines columns_to_read;
DM::PushDownFilterPtr filter;
const UInt64 max_version;
const UInt64 start_ts;
const size_t expected_block_size;
const DM::ReadMode read_mode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ void ColumnFileSetReader::getPlaceItems(
bool ColumnFileSetReader::shouldPlace(
const DMContext & context,
const RowKeyRange & relevant_range,
UInt64 max_version,
UInt64 start_ts,
size_t placed_rows)
{
auto & column_files = snapshot->getColumnFiles();
Expand Down Expand Up @@ -319,7 +319,7 @@ bool ColumnFileSetReader::shouldPlace(

for (auto i = rows_start_in_file; i < rows_end_in_file; ++i)
{
if (version_col_data[i] <= max_version && relevant_range.check(rkcc.getRowKeyValue(i)))
if (version_col_data[i] <= start_ts && relevant_range.check(rkcc.getRowKeyValue(i)))
return true;
}
}
Expand All @@ -334,7 +334,7 @@ bool ColumnFileSetReader::shouldPlace(

for (auto i = rows_start_in_file; i < rows_end_in_file; ++i)
{
if (version_col_data[i] <= max_version && relevant_range.check(rkcc.getRowKeyValue(i)))
if (version_col_data[i] <= start_ts && relevant_range.check(rkcc.getRowKeyValue(i)))
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ColumnFileSetReader
bool shouldPlace(
const DMContext & context,
const RowKeyRange & relevant_range,
UInt64 max_version,
UInt64 start_ts,
size_t placed_rows);
};

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
AfterSegmentRead after_segment_read_,
const ColumnDefines & columns_to_read_,
const PushDownFilterPtr & filter_,
UInt64 max_version_,
UInt64 start_ts_,
size_t expected_block_size_,
ReadMode read_mode_,
const String & req_id)
Expand All @@ -55,7 +55,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
, columns_to_read(columns_to_read_)
, filter(filter_)
, header(toEmptyBlock(columns_to_read))
, max_version(max_version_)
, start_ts(start_ts_)
, expected_block_size(expected_block_size_)
, read_mode(read_mode_)
, log(Logger::get(req_id))
Expand Down Expand Up @@ -99,7 +99,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
task->read_snapshot,
task->ranges,
filter,
max_version,
start_ts,
block_size);
LOG_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo());
}
Expand Down Expand Up @@ -131,7 +131,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
ColumnDefines columns_to_read;
PushDownFilterPtr filter;
Block header;
const UInt64 max_version;
const UInt64 start_ts;
const size_t expected_block_size;
const ReadMode read_mode;
size_t total_rows = 0;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ class DeltaValueReader
DeltaIndexPtr my_delta_index,
const RowKeyRange & segment_range,
const RowKeyRange & relevant_range,
UInt64 max_version);
UInt64 start_ts);
};

class DeltaValueInputStream : public SkippableBlockInputStream
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ bool DeltaValueReader::shouldPlace(
DeltaIndexPtr my_delta_index,
const RowKeyRange & segment_range_,
const RowKeyRange & relevant_range,
UInt64 max_version)
UInt64 start_ts)
{
auto [placed_rows, placed_delete_ranges] = my_delta_index->getPlacedStatus();

Expand All @@ -230,11 +230,11 @@ bool DeltaValueReader::shouldPlace(
return true;

size_t rows_in_persisted_file_snap = delta_snap->getMemTableSetRowsOffset();
return persisted_files_reader->shouldPlace(context, relevant_range, max_version, placed_rows)
return persisted_files_reader->shouldPlace(context, relevant_range, start_ts, placed_rows)
|| mem_table_reader->shouldPlace(
context,
relevant_range,
max_version,
start_ts,
placed_rows <= rows_in_persisted_file_snap ? 0 : placed_rows - rows_in_persisted_file_snap);
}

Expand Down
48 changes: 21 additions & 27 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ DM::WriteResult DeltaMergeStore::write(
}
LOG_DEBUG(
log,
"region_id: {}, applied_index: {}, record_count: {}, versions: {}",
"region_id={} applied_index={} record_count={} versions={}",
applied_status.region_id,
applied_status.applied_index,
block.rows(),
Expand Down Expand Up @@ -1169,7 +1169,7 @@ BlockInputStreams DeltaMergeStore::read(
const ColumnDefines & columns_to_read,
const RowKeyRanges & sorted_ranges,
size_t num_streams,
UInt64 max_version,
UInt64 start_ts,
const PushDownFilterPtr & filter,
const RuntimeFilteList & runtime_filter_list,
int rf_max_wait_time_ms,
Expand Down Expand Up @@ -1197,15 +1197,6 @@ BlockInputStreams DeltaMergeStore::read(
/*try_split_task =*/!enable_read_thread);
auto log_tracing_id = getLogTracingId(*dm_context);
auto tracing_logger = log->getChild(log_tracing_id);
LOG_INFO(
tracing_logger,
"Read create segment snapshot done, keep_order={} dt_enable_read_thread={} enable_read_thread={} "
"is_fast_scan={} is_push_down_filter_empty={}",
keep_order,
db_context.getSettingsRef().dt_enable_read_thread,
enable_read_thread,
is_fast_scan,
filter == nullptr || filter->before_where == nullptr);

auto after_segment_read = [&](const DMContextPtr & dm_context_, const SegmentPtr & segment_) {
// TODO: Update the tracing_id before checkSegmentUpdate?
Expand All @@ -1219,7 +1210,7 @@ BlockInputStreams DeltaMergeStore::read(
extra_table_id_index,
columns_to_read,
filter,
max_version,
start_ts,
expected_block_size,
read_mode,
std::move(tasks),
Expand Down Expand Up @@ -1252,7 +1243,7 @@ BlockInputStreams DeltaMergeStore::read(
after_segment_read,
filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read,
filter,
max_version,
start_ts,
expected_block_size,
read_mode,
log_tracing_id);
Expand All @@ -1263,7 +1254,13 @@ BlockInputStreams DeltaMergeStore::read(
}
LOG_INFO(
tracing_logger,
"Read create stream done, pool_id={} num_streams={}",
"Read create stream done, keep_order={} dt_enable_read_thread={} enable_read_thread={} "
"is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={}",
keep_order,
db_context.getSettingsRef().dt_enable_read_thread,
enable_read_thread,
is_fast_scan,
filter == nullptr || filter->before_where == nullptr,
read_task_pool->pool_id,
final_num_stream);

Expand All @@ -1278,7 +1275,7 @@ void DeltaMergeStore::read(
const ColumnDefines & columns_to_read,
const RowKeyRanges & sorted_ranges,
size_t num_streams,
UInt64 max_version,
UInt64 start_ts,
const PushDownFilterPtr & filter,
const RuntimeFilteList & runtime_filter_list,
int rf_max_wait_time_ms,
Expand Down Expand Up @@ -1306,15 +1303,6 @@ void DeltaMergeStore::read(
/*try_split_task =*/!enable_read_thread);
auto log_tracing_id = getLogTracingId(*dm_context);
auto tracing_logger = log->getChild(log_tracing_id);
LOG_INFO(
tracing_logger,
"Read create segment snapshot done, keep_order={} dt_enable_read_thread={} enable_read_thread={} "
"is_fast_scan={} is_push_down_filter_empty={}",
keep_order,
db_context.getSettingsRef().dt_enable_read_thread,
enable_read_thread,
is_fast_scan,
filter == nullptr || filter->before_where == nullptr);

auto after_segment_read = [&](const DMContextPtr & dm_context_, const SegmentPtr & segment_) {
// TODO: Update the tracing_id before checkSegmentUpdate?
Expand All @@ -1329,7 +1317,7 @@ void DeltaMergeStore::read(
extra_table_id_index,
columns_to_read,
filter,
max_version,
start_ts,
expected_block_size,
read_mode,
std::move(tasks),
Expand Down Expand Up @@ -1366,7 +1354,7 @@ void DeltaMergeStore::read(
after_segment_read,
columns_after_cast,
filter,
max_version,
start_ts,
expected_block_size,
read_mode,
log_tracing_id));
Expand All @@ -1383,7 +1371,13 @@ void DeltaMergeStore::read(

LOG_INFO(
tracing_logger,
"Read create PipelineExec done, pool_id={} num_streams={}",
"Read create PipelineExec done, keep_order={} dt_enable_read_thread={} enable_read_thread={} "
"is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={}",
keep_order,
db_context.getSettingsRef().dt_enable_read_thread,
enable_read_thread,
is_fast_scan,
filter == nullptr || filter->before_where == nullptr,
read_task_pool->pool_id,
final_num_stream);
}
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ class DeltaMergeStore : private boost::noncopyable
const ColumnDefines & columns_to_read,
const RowKeyRanges & sorted_ranges,
size_t num_streams,
UInt64 max_version,
UInt64 start_ts,
const PushDownFilterPtr & filter,
const RuntimeFilteList & runtime_filter_list,
int rf_max_wait_time_ms,
Expand All @@ -430,7 +430,7 @@ class DeltaMergeStore : private boost::noncopyable
const ColumnDefines & columns_to_read,
const RowKeyRanges & sorted_ranges,
size_t num_streams,
UInt64 max_version,
UInt64 start_ts,
const PushDownFilterPtr & filter,
const RuntimeFilteList & runtime_filter_list,
int rf_max_wait_time_ms,
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,10 @@ Block DMFileReader::read()

if (do_clean_read_on_normal_mode)
{
UInt64 max_version = 0;
UInt64 start_ts = 0;
for (size_t pack_id = start_pack_id; pack_id < next_pack_id; ++pack_id)
max_version = std::max(pack_filter.getMaxVersion(pack_id), max_version);
do_clean_read_on_normal_mode = max_version <= max_read_version;
start_ts = std::max(pack_filter.getMaxVersion(pack_id), start_ts);
do_clean_read_on_normal_mode = start_ts <= max_read_version;
}

for (size_t i = 0; i < read_columns.size(); ++i)
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class RNWorkerPrepareStreams
const auto & settings = task->dm_context->global_context.getSettingsRef();
task->initInputStream(
*columns_to_read,
read_tso,
start_ts,
push_down_filter,
read_mode,
settings.max_block_size,
Expand All @@ -53,7 +53,7 @@ class RNWorkerPrepareStreams

public:
const ColumnDefinesPtr columns_to_read;
const UInt64 read_tso;
const UInt64 start_ts;
const PushDownFilterPtr push_down_filter;
const ReadMode read_mode;

Expand All @@ -65,7 +65,7 @@ class RNWorkerPrepareStreams
const LoggerPtr & log;
const size_t concurrency;
const ColumnDefinesPtr & columns_to_read;
const UInt64 read_tso;
const UInt64 start_ts;
const PushDownFilterPtr & push_down_filter;
const ReadMode read_mode;
};
Expand All @@ -82,7 +82,7 @@ class RNWorkerPrepareStreams
options.log,
options.concurrency)
, columns_to_read(options.columns_to_read)
, read_tso(options.read_tso)
, start_ts(options.start_ts)
, push_down_filter(options.push_down_filter)
, read_mode(options.read_mode)
{}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ RNWorkers::RNWorkers(
.log = options.log,
.concurrency = prepare_streams_concurrency,
.columns_to_read = options.columns_to_read,
.read_tso = options.read_tso,
.start_ts = options.start_ts,
.push_down_filter = options.push_down_filter,
.read_mode = options.read_mode,
});
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class RNWorkers : private boost::noncopyable
{
const LoggerPtr log;
const ColumnDefinesPtr & columns_to_read;
const UInt64 read_tso;
const UInt64 start_ts;
const PushDownFilterPtr & push_down_filter;
const ReadMode read_mode;
};
Expand Down
Loading

0 comments on commit 4da9e96

Please sign in to comment.