Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support tiflash_fastscan variable from tidb, remove fast mode in table_info, and change name fast mode to fast scan #5589

Merged
merged 15 commits into from
Aug 30, 2022
Merged
2 changes: 1 addition & 1 deletion contrib/tipb
2 changes: 0 additions & 2 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ DBGInvoker::DBGInvoker()
regSchemafulFunc("query_mapped", dbgFuncQueryMapped);
regSchemalessFunc("get_tiflash_replica_count", dbgFuncGetTiflashReplicaCount);
regSchemalessFunc("get_partition_tables_tiflash_replica_count", dbgFuncGetPartitionTablesTiflashReplicaCount);
regSchemalessFunc("get_tiflash_mode", dbgFuncGetTiflashMode);
regSchemalessFunc("get_partition_tables_tiflash_mode", dbgFuncGetPartitionTablesTiflashMode);

regSchemalessFunc("search_log_for_key", dbgFuncSearchLogForKey);
regSchemalessFunc("tidb_dag", dbgFuncTiDBQueryFromNaturalDag);
Expand Down
53 changes: 0 additions & 53 deletions dbms/src/Debug/dbgFuncSchemaName.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,57 +180,4 @@ void dbgFuncGetPartitionTablesTiflashReplicaCount(Context & context, const ASTs

output(fmt_buf.toString());
}

void dbgFuncGetTiflashMode(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.empty() || args.size() != 2)
throw Exception("Args not matched, should be: database-name[, table-name]", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
FmtBuffer fmt_buf;

const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
auto mapped = mappedTable(context, database_name, table_name);
auto storage = context.getTable(mapped->first, mapped->second);
auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
if (!managed_storage)
throw Exception(database_name + "." + table_name + " is not ManageableStorage", ErrorCodes::BAD_ARGUMENTS);

fmt_buf.append((TiFlashModeToString(managed_storage->getTableInfo().tiflash_mode)));

output(fmt_buf.toString());
}

void dbgFuncGetPartitionTablesTiflashMode(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.empty() || args.size() != 2)
throw Exception("Args not matched, should be: database-name[, table-name]", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
FmtBuffer fmt_buf;

const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
auto mapped = mappedTable(context, database_name, table_name);
auto storage = context.getTable(mapped->first, mapped->second);
auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
if (!managed_storage)
throw Exception(database_name + "." + table_name + " is not ManageableStorage", ErrorCodes::BAD_ARGUMENTS);

auto table_info = managed_storage->getTableInfo();

if (!table_info.isLogicalPartitionTable())
throw Exception(database_name + "." + table_name + " is not logical partition table", ErrorCodes::BAD_ARGUMENTS);

SchemaNameMapper name_mapper;
for (const auto & part_def : table_info.partition.definitions)
{
auto paritition_table_info = table_info.producePartitionTableInfo(part_def.id, name_mapper);
auto partition_storage = context.getTMTContext().getStorages().get(paritition_table_info->id);
fmt_buf.append((TiFlashModeToString(partition_storage->getTableInfo().tiflash_mode)));
fmt_buf.append("/");
}

output(fmt_buf.toString());
}

} // namespace DB
10 changes: 0 additions & 10 deletions dbms/src/Debug/dbgFuncSchemaName.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,4 @@ void dbgFuncGetTiflashReplicaCount(Context & context, const ASTs & args, DBGInvo
// ./storage-client.sh "DBGInvoke get_partition_tables_tiflash_replica_count(db_name, table_name)"
void dbgFuncGetPartitionTablesTiflashReplicaCount(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Get table's tiflash mode with mapped table name
// Usage:
// ./storage-client.sh "DBGInvoke get_tiflash_mode(db_name, table_name)"
void dbgFuncGetTiflashMode(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Get the logical table's partition tables' tiflash replica counts with mapped table name
// Usage:
// ./storage-client.sh "DBGInvoke get_partition_tables_tiflash_mode(db_name, table_name)"
void dbgFuncGetPartitionTablesTiflashMode(Context & context, const ASTs & args, DBGInvoker::Printer output);

} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
context.getTimezoneInfo());
query_info.req_id = fmt::format("{} Table<{}>", log->identifier(), table_id);
query_info.keep_order = table_scan.keepOrder();
query_info.is_fast_scan = table_scan.isFastScan();
return query_info;
};
if (table_scan.isPartitionTableScan())
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ TiDBTableScan::TiDBTableScan(
// Only No-partition table need keep order when tablescan executor required keep order.
// If keep_order is not set, keep order for safety.
, keep_order(!is_partition_table_scan && (table_scan->tbl_scan().keep_order() || !table_scan->tbl_scan().has_keep_order()))
, is_fast_scan(table_scan->tbl_scan().is_fast_scan())
{
if (is_partition_table_scan)
{
Expand Down Expand Up @@ -73,6 +74,7 @@ void TiDBTableScan::constructTableScanForRemoteRead(tipb::TableScan * tipb_table
tipb_table_scan->set_next_read_engine(tipb::EngineType::Local);
for (auto id : partition_table_scan.primary_prefix_column_ids())
tipb_table_scan->add_primary_prefix_column_ids(id);
tipb_table_scan->set_is_fast_scan(partition_table_scan.is_fast_scan());
}
else
{
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ class TiDBTableScan
return keep_order;
}

bool isFastScan() const
{
return is_fast_scan;
}

private:
const tipb::Executor * table_scan;
String executor_id;
Expand All @@ -71,6 +76,7 @@ class TiDBTableScan
std::vector<Int64> physical_table_ids;
Int64 logical_table_id;
bool keep_order;
bool is_fast_scan;
};

} // namespace DB
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
const RSOperatorPtr & filter,
const String & tracing_id,
bool keep_order,
bool is_fast_mode,
bool is_fast_scan,
size_t expected_block_size,
const SegmentIdSet & read_segments,
size_t extra_table_id_index)
Expand Down Expand Up @@ -1277,8 +1277,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
filter,
max_version,
expected_block_size,
/* is_raw = */ is_fast_mode,
/* do_delete_mark_filter_for_raw = */ is_fast_mode,
/* is_raw = */ is_fast_scan,
/* do_delete_mark_filter_for_raw = */ is_fast_scan,
std::move(tasks),
after_segment_read);

Expand Down Expand Up @@ -1308,8 +1308,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
filter,
max_version,
expected_block_size,
/* is_raw_ */ is_fast_mode,
/* do_delete_mark_filter_for_raw_ */ is_fast_mode,
/* is_raw_= */ is_fast_scan,
/* do_delete_mark_filter_for_raw_= */ is_fast_scan,
extra_table_id_index,
physical_table_id,
req_info);
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,9 @@ class DeltaMergeStore : private boost::noncopyable


/// Read rows in two modes:
/// when is_fast_mode == false, we are in normal mode. Thus we will read rows with MVCC filtering, del mark !=0 filter and sorted merge
/// when is_fast_mode == true, we are in fast mode. Thus we will read rows without MVCC and sorted merge
/// `sorted_ranges` should be already sorted and merged
/// when is_fast_scan == false, we will read rows with MVCC filtering, del mark !=0 filter and sorted merge.
/// when is_fast_scan == true, we will read rows without MVCC and sorted merge.
/// `sorted_ranges` should be already sorted and merged.
BlockInputStreams read(const Context & db_context,
const DB::Settings & db_settings,
const ColumnDefines & columns_to_read,
Expand All @@ -371,7 +371,7 @@ class DeltaMergeStore : private boost::noncopyable
const RSOperatorPtr & filter,
const String & tracing_id,
bool keep_order,
bool is_fast_mode = false, // set true when read in fast mode
bool is_fast_scan = false,
size_t expected_block_size = DEFAULT_BLOCK_SIZE,
const SegmentIdSet & read_segments = {},
size_t extra_table_id_index = InvalidColumnID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr &
is_common_handle,
enable_handle_clean_read,
enable_del_clean_read,
is_fast_mode,
is_fast_scan,
max_data_version,
std::move(pack_filter),
mark_cache,
Expand Down
22 changes: 11 additions & 11 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,19 @@ class DMFileBlockInputStreamBuilder
// **** filters **** //

// Only set enable_handle_clean_read_ param to true when
// in normal mode:
// 1. There is no delta.
// 2. You don't need pk, version and delete_tag columns
// in fast mode:
// 1. You don't need pk columns
// If you have no idea what it means, then simply set it to false.
// Only set is_fast_mode_ param to true when read in fast mode.
// Only set enable_del_clean_read_ param to true when you don't need del columns in fast mode.
// in normal mode (is_fast_scan_ == false):
// 1. There is no delta.
// 2. You don't need pk, version and delete_tag columns
// in fast scan mode (is_fast_scan_ == true):
// 1. You don't need pk columns
// If you have no idea what it means, then simply set it to false.
// Only set enable_del_clean_read_ param to true when you don't need del columns in fast scan.
// `max_data_version_` is the MVCC filter version for reading. Used by clean read check
DMFileBlockInputStreamBuilder & enableCleanRead(bool enable_handle_clean_read_, bool is_fast_mode_, bool enable_del_clean_read_, UInt64 max_data_version_)
DMFileBlockInputStreamBuilder & enableCleanRead(bool enable_handle_clean_read_, bool is_fast_scan_, bool enable_del_clean_read_, UInt64 max_data_version_)
hongyunyan marked this conversation as resolved.
Show resolved Hide resolved
{
enable_handle_clean_read = enable_handle_clean_read_;
enable_del_clean_read = enable_del_clean_read_;
is_fast_mode = is_fast_mode_;
is_fast_scan = is_fast_scan_;
max_data_version = max_data_version_;
return *this;
}
Expand Down Expand Up @@ -159,8 +158,9 @@ class DMFileBlockInputStreamBuilder
FileProviderPtr file_provider;

// clean read

bool enable_handle_clean_read = false;
bool is_fast_mode = false;
bool is_fast_scan = false;
bool enable_del_clean_read = false;
UInt64 max_data_version = std::numeric_limits<UInt64>::max();
// Rough set filter
Expand Down
11 changes: 6 additions & 5 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ DMFileReader::DMFileReader(
// clean read
bool enable_handle_clean_read_,
bool enable_del_clean_read_,
bool is_fast_mode_,
bool is_fast_scan_,
UInt64 max_read_version_,
// filters
DMFilePackFilter && pack_filter_,
Expand All @@ -233,7 +233,7 @@ DMFileReader::DMFileReader(
, single_file_mode(dmfile_->isSingleFileMode())
, enable_handle_clean_read(enable_handle_clean_read_)
, enable_del_clean_read(enable_del_clean_read_)
, is_fast_mode(is_fast_mode_)
, is_fast_scan(is_fast_scan_)
, max_read_version(max_read_version_)
, pack_filter(std::move(pack_filter_))
, skip_packs_by_column(read_columns.size(), 0)
Expand Down Expand Up @@ -364,10 +364,11 @@ Block DMFileReader::read()
}

// TODO: this will need better algorithm: we should separate those packs which can and can not do clean read.
bool do_clean_read_on_normal_mode = enable_handle_clean_read && expected_handle_res == All && not_clean_rows == 0 && (!is_fast_mode);
bool do_clean_read_on_normal_mode = enable_handle_clean_read && expected_handle_res == All && not_clean_rows == 0 && (!is_fast_scan);

bool do_clean_read_on_handle_on_fast_mode = enable_handle_clean_read && is_fast_scan && expected_handle_res == All;
bool do_clean_read_on_del_on_fast_mode = enable_del_clean_read && is_fast_scan && deleted_rows == 0;

bool do_clean_read_on_handle_on_fast_mode = enable_handle_clean_read && is_fast_mode && expected_handle_res == All;
bool do_clean_read_on_del_on_fast_mode = enable_del_clean_read && is_fast_mode && deleted_rows == 0;

if (do_clean_read_on_normal_mode)
{
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class DMFileReader
// If you have no idea what it means, then simply set it to false.
bool enable_handle_clean_read_,
bool enable_del_clean_read_,
bool is_fast_mode_,
bool is_fast_scan_,
// The the MVCC filter version. Used by clean read check.
UInt64 max_read_version_,
// filters
Expand Down Expand Up @@ -148,7 +148,8 @@ class DMFileReader
// if we don't need del column, we will try to do clean read on del_column(enable_del_clean_read is true).
const bool enable_handle_clean_read;
const bool enable_del_clean_read;
const bool is_fast_mode;
const bool is_fast_scan;

const UInt64 max_read_version;

/// Filters
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,8 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
/// But this way seems not to be robustness enough, maybe we need another flag?
auto new_columns_to_read = std::make_shared<ColumnDefines>();

// new_columns_to_read need at most columns_to_read.size() + 2, due to may extra insert into the handle column and del_mark column.
new_columns_to_read->reserve(columns_to_read.size() + 2);

new_columns_to_read->push_back(getExtraHandleColumnDefine(is_common_handle));
if (filter_delete_mark)
Expand Down Expand Up @@ -578,7 +580,7 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
std::numeric_limits<UInt64>::max(),
expected_block_size,
/* enable_handle_clean_read */ enable_handle_clean_read,
/* is_fast_mode */ filter_delete_mark,
/* is_fast_scan */ filter_delete_mark,
/* enable_del_clean_read */ enable_del_clean_read);

BlockInputStreamPtr delta_stream = std::make_shared<DeltaValueInputStream>(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,17 +330,17 @@ StableValueSpace::Snapshot::getInputStream(
UInt64 max_data_version,
size_t expected_block_size,
bool enable_handle_clean_read,
bool is_fast_mode,
bool is_fast_scan,
bool enable_del_clean_read)
{
LOG_FMT_DEBUG(log, "max_data_version: {}, enable_handle_clean_read: {}, is_fast_mode: {}, enable_del_clean_read: {}", max_data_version, enable_handle_clean_read, is_fast_mode, enable_del_clean_read);
LOG_FMT_DEBUG(log, "max_data_version: {}, enable_handle_clean_read: {}, is_fast_mode: {}, enable_del_clean_read: {}", max_data_version, enable_handle_clean_read, is_fast_scan, enable_del_clean_read);
SkippableBlockInputStreams streams;

for (size_t i = 0; i < stable->files.size(); i++)
{
DMFileBlockInputStreamBuilder builder(context.db_context);
builder
.enableCleanRead(enable_handle_clean_read, is_fast_mode, enable_del_clean_read, max_data_version)
.enableCleanRead(enable_handle_clean_read, is_fast_scan, enable_del_clean_read, max_data_version)
.setRSOperator(filter)
.setColumnCache(column_caches[i])
.setTracingID(context.tracing_id)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/StableValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>
UInt64 max_data_version,
size_t expected_block_size,
bool enable_handle_clean_read,
bool is_fast_mode = false,
bool is_fast_scan = false,
bool enable_del_clean_read = false);

RowsAndBytes getApproxRowsAndBytes(const DMContext & context, const RowKeyRange & range) const;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ target_link_libraries(dm_test_storage_delta_merge
add_executable(dm_test_delta_merge_store EXCLUDE_FROM_ALL gtest_dm_delta_merge_store.cpp)
target_link_libraries(dm_test_delta_merge_store dbms gtest_main clickhouse_functions)

add_executable(dm_test_delta_merge_store_for_fast_mode EXCLUDE_FROM_ALL gtest_dm_delta_merge_store_for_fast_mode.cpp)
target_link_libraries(dm_test_delta_merge_store_for_fast_mode dbms gtest_main clickhouse_functions)
add_executable(dm_test_delta_merge_store_for_fast_scan EXCLUDE_FROM_ALL gtest_dm_delta_merge_store_for_fast_scan.cpp)
target_link_libraries(dm_test_delta_merge_store_for_fast_scan dbms gtest_main clickhouse_functions)

add_executable(dm_test_segment EXCLUDE_FROM_ALL gtest_dm_segment.cpp)
target_link_libraries(dm_test_segment dbms gtest_main clickhouse_functions)
Expand Down
Loading