Skip to content

Commit

Permalink
support PartitionTableScan in TiFlash (#3876)
Browse files Browse the repository at this point in the history
close #3873
  • Loading branch information
windtalker authored Mar 14, 2022
1 parent 41f18b5 commit 2b14421
Show file tree
Hide file tree
Showing 36 changed files with 1,303 additions and 549 deletions.
2 changes: 1 addition & 1 deletion contrib/tipb
3 changes: 2 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ namespace DB
M(tiflash_coprocessor_executor_count, "Total number of each executor", Counter, F(type_ts, {"type", "table_scan"}), \
F(type_sel, {"type", "selection"}), F(type_agg, {"type", "aggregation"}), F(type_topn, {"type", "top_n"}), \
F(type_limit, {"type", "limit"}), F(type_join, {"type", "join"}), F(type_exchange_sender, {"type", "exchange_sender"}), \
F(type_exchange_receiver, {"type", "exchange_receiver"}), F(type_projection, {"type", "projection"})) \
F(type_exchange_receiver, {"type", "exchange_receiver"}), F(type_projection, {"type", "projection"}), \
F(type_partition_ts, {"type", "partition_table_scan"})) \
M(tiflash_coprocessor_request_duration_seconds, "Bucketed histogram of request duration", Histogram, \
F(type_batch, {{"type", "batch"}}, ExpBuckets{0.0005, 2, 30}), F(type_cop, {{"type", "cop"}}, ExpBuckets{0.0005, 2, 30}), \
F(type_super_batch, {{"type", "super_batch"}}, ExpBuckets{0.0005, 2, 30}), \
Expand Down
139 changes: 96 additions & 43 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,18 @@ DAGProperties getDAGProperties(const String & prop_string)
return ret;
}

void setTipbRegionInfo(coprocessor::RegionInfo * tipb_region_info, const std::pair<RegionID, RegionPtr> & region, TableID table_id)
{
tipb_region_info->set_region_id(region.first);
auto * meta = tipb_region_info->mutable_region_epoch();
meta->set_conf_ver(region.second->confVer());
meta->set_version(region.second->version());
auto * range = tipb_region_info->add_ranges();
auto handle_range = getHandleRangeByTable(region.second->getRange()->rawKeys(), table_id);
range->set_start(RecordKVFormat::genRawKey(table_id, handle_range.first.handle_id));
range->set_end(RecordKVFormat::genRawKey(table_id, handle_range.second.handle_id));
}

BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DAGProperties & properties, QueryTasks & query_tasks, MakeResOutputStream & func_wrap_output_stream)
{
if (properties.is_mpp_query)
Expand Down Expand Up @@ -288,22 +300,44 @@ BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DA
if (table_id != -1)
{
/// contains a table scan
auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(table_id);
if (regions.size() < static_cast<size_t>(properties.mpp_partition_num))
throw Exception("Not supported: table region num less than mpp partition num");
for (size_t i = 0; i < regions.size(); i++)
const auto & table_info = MockTiDB::instance().getTableInfoByID(table_id);
if (table_info->is_partition_table)
{
if (i % properties.mpp_partition_num != static_cast<size_t>(task.partition_id))
continue;
auto * region = req->add_regions();
region->set_region_id(regions[i].first);
auto * meta = region->mutable_region_epoch();
meta->set_conf_ver(regions[i].second->confVer());
meta->set_version(regions[i].second->version());
auto * range = region->add_ranges();
auto handle_range = getHandleRangeByTable(regions[i].second->getRange()->rawKeys(), table_id);
range->set_start(RecordKVFormat::genRawKey(table_id, handle_range.first.handle_id));
range->set_end(RecordKVFormat::genRawKey(table_id, handle_range.second.handle_id));
size_t current_region_size = 0;
coprocessor::TableRegions * current_table_regions = nullptr;
for (const auto & partition : table_info->partition.definitions)
{
const auto partition_id = partition.id;
auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(partition_id);
for (size_t i = 0; i < regions.size(); ++i)
{
if ((current_region_size + i) % properties.mpp_partition_num != static_cast<size_t>(task.partition_id))
continue;
if (current_table_regions != nullptr && current_table_regions->physical_table_id() != partition_id)
current_table_regions = nullptr;
if (current_table_regions == nullptr)
{
current_table_regions = req->add_table_regions();
current_table_regions->set_physical_table_id(partition_id);
}
setTipbRegionInfo(current_table_regions->add_regions(), regions[i], partition_id);
}
current_region_size += regions.size();
}
if (current_region_size < static_cast<size_t>(properties.mpp_partition_num))
throw Exception("Not supported: table region num less than mpp partition num");
}
else
{
auto regions = context.getTMTContext().getRegionTable().getRegionsByTable(table_id);
if (regions.size() < static_cast<size_t>(properties.mpp_partition_num))
throw Exception("Not supported: table region num less than mpp partition num");
for (size_t i = 0; i < regions.size(); ++i)
{
if (i % properties.mpp_partition_num != static_cast<size_t>(task.partition_id))
continue;
setTipbRegionInfo(req->add_regions(), regions[i], table_id);
}
}
}
pingcap::kv::RpcCall<mpp::DispatchTaskRequest> call(req);
Expand Down Expand Up @@ -413,11 +447,12 @@ void dbgFuncTiDBQueryFromNaturalDag(Context & context, const ASTs & args, DBGInv
static Poco::Logger * log = &Poco::Logger::get("MockDAG");
LOG_INFO(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString());
tipb::SelectResponse dag_response;
RegionInfoMap regions;
regions.emplace(region_id, RegionInfo(region_id, region->version(), region->confVer(), std::move(key_ranges), nullptr));
TablesRegionsInfo tables_regions_info(true);
auto & table_regions_info = tables_regions_info.getSingleTableRegions();
table_regions_info.local_regions.emplace(region_id, RegionInfo(region_id, region->version(), region->confVer(), std::move(key_ranges), nullptr));

DAGContext dag_context(dag_request);
dag_context.regions_for_local_read = regions;
dag_context.tables_regions_info = std::move(tables_regions_info);
dag_context.log = std::make_shared<LogWithPrefix>(&Poco::Logger::get("MockDAG"), "");
context.setDAGContext(&dag_context);
DAGDriver driver(context, properties.start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true);
Expand Down Expand Up @@ -1097,32 +1132,49 @@ struct TableScan : public Executor
output_schema.erase(std::remove_if(output_schema.begin(), output_schema.end(), [&](const auto & field) { return used_columns.count(field.first) == 0; }),
output_schema.end());
}
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t, const MPPInfo &, const Context &) override

void setTipbColumnInfo(tipb::ColumnInfo * ci, const DAGColumnInfo & dag_column_info) const
{
tipb_executor->set_tp(tipb::ExecType::TypeTableScan);
tipb_executor->set_executor_id(name);
auto * ts = tipb_executor->mutable_tbl_scan();
ts->set_table_id(table_info.id);
for (const auto & info : output_schema)
{
tipb::ColumnInfo * ci = ts->add_columns();
auto column_name = splitQualifiedName(info.first).second;
if (column_name == MutableSupport::tidb_pk_column_name)
ci->set_column_id(-1);
else
ci->set_column_id(table_info.getColumnID(column_name));
ci->set_tp(info.second.tp);
ci->set_flag(info.second.flag);
ci->set_columnlen(info.second.flen);
ci->set_decimal(info.second.decimal);
if (!info.second.elems.empty())
auto column_name = splitQualifiedName(dag_column_info.first).second;
if (column_name == MutableSupport::tidb_pk_column_name)
ci->set_column_id(-1);
else
ci->set_column_id(table_info.getColumnID(column_name));
ci->set_tp(dag_column_info.second.tp);
ci->set_flag(dag_column_info.second.flag);
ci->set_columnlen(dag_column_info.second.flen);
ci->set_decimal(dag_column_info.second.decimal);
if (!dag_column_info.second.elems.empty())
{
for (const auto & pair : dag_column_info.second.elems)
{
for (const auto & pair : info.second.elems)
{
ci->add_elems(pair.first);
}
ci->add_elems(pair.first);
}
}
}

bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t, const MPPInfo &, const Context &) override
{
if (table_info.is_partition_table)
{
tipb_executor->set_tp(tipb::ExecType::TypePartitionTableScan);
tipb_executor->set_executor_id(name);
auto * partition_ts = tipb_executor->mutable_partition_table_scan();
partition_ts->set_table_id(table_info.id);
for (const auto & info : output_schema)
setTipbColumnInfo(partition_ts->add_columns(), info);
for (const auto & partition : table_info.partition.definitions)
partition_ts->add_partition_ids(partition.id);
}
else
{
tipb_executor->set_tp(tipb::ExecType::TypeTableScan);
tipb_executor->set_executor_id(name);
auto * ts = tipb_executor->mutable_tbl_scan();
ts->set_table_id(table_info.id);
for (const auto & info : output_schema)
setTipbColumnInfo(ts->add_columns(), info);
}
return true;
}
void toMPPSubPlan(size_t &, const DAGProperties &, std::unordered_map<String, std::pair<std::shared_ptr<ExchangeReceiver>, std::shared_ptr<ExchangeSender>>> &) override
Expand Down Expand Up @@ -2466,12 +2518,13 @@ tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest
static Poco::Logger * log = &Poco::Logger::get("MockDAG");
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString());
tipb::SelectResponse dag_response;
RegionInfoMap regions;
TablesRegionsInfo tables_regions_info(true);
auto & table_regions_info = tables_regions_info.getSingleTableRegions();

regions.emplace(region_id, RegionInfo(region_id, region_version, region_conf_version, std::move(key_ranges), nullptr));
table_regions_info.local_regions.emplace(region_id, RegionInfo(region_id, region_version, region_conf_version, std::move(key_ranges), nullptr));

DAGContext dag_context(dag_request);
dag_context.regions_for_local_read = regions;
dag_context.tables_regions_info = std::move(tables_regions_info);
dag_context.log = std::make_shared<LogWithPrefix>(log, "");
context.setDAGContext(&dag_context);

Expand Down
25 changes: 5 additions & 20 deletions dbms/src/Flash/BatchCoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,33 +43,18 @@ grpc::Status BatchCoprocessorHandler::execute()
{ GET_METRIC(tiflash_coprocessor_handling_request_count, type_super_batch_cop_dag).Decrement(); });

auto dag_request = getDAGRequestFromStringWithRetry(cop_request->data());
RegionInfoMap regions;
RegionInfoList retry_regions;
for (auto & r : cop_request->regions())
{
auto res = regions.emplace(r.region_id(),
RegionInfo(
r.region_id(),
r.region_epoch().version(),
r.region_epoch().conf_ver(),
GenCopKeyRange(r.ranges()),
nullptr));
if (!res.second)
{
retry_regions.emplace_back(RegionInfo(r.region_id(), r.region_epoch().version(), r.region_epoch().conf_ver(), CoprocessorHandler::GenCopKeyRange(r.ranges()), nullptr));
}
}
auto tables_regions_info = TablesRegionsInfo::create(cop_request->regions(), cop_request->table_regions(), cop_context.db_context.getTMTContext());
LOG_FMT_DEBUG(
log,
"{}: Handling {} regions in DAG request: {}",
"{}: Handling {} regions from {} physical tables in DAG request: {}",
__PRETTY_FUNCTION__,
regions.size(),
tables_regions_info.regionCount(),
tables_regions_info.tableCount(),
dag_request.DebugString());

DAGContext dag_context(dag_request);
dag_context.is_batch_cop = true;
dag_context.regions_for_local_read = std::move(regions);
dag_context.regions_for_remote_read = std::move(retry_regions);
dag_context.tables_regions_info = std::move(tables_regions_info);
dag_context.log = std::make_shared<LogWithPrefix>(log, "");
dag_context.tidb_host = cop_context.db_context.getClientInfo().current_address.toString();
cop_context.db_context.setDAGContext(&dag_context);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/BatchCoprocessorHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ class BatchCoprocessorHandler : public CoprocessorHandler

~BatchCoprocessorHandler() = default;

grpc::Status execute();
grpc::Status execute() override;

protected:
grpc::Status recordError(grpc::StatusCode err_code, const String & err_msg);
grpc::Status recordError(grpc::StatusCode err_code, const String & err_msg) override;

protected:
const coprocessor::BatchRequest * cop_request;
Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,14 @@ int DAGContext::getNewThreadCountOfExchangeReceiver() const
return new_thread_count_of_exchange_receiver;
}

bool DAGContext::containsRegionsInfoForTable(Int64 table_id) const
{
return tables_regions_info.containsRegionsInfoForTable(table_id);
}

const SingleTableRegions & DAGContext::getTableRegionsInfoByTableID(Int64 table_id) const
{
return tables_regions_info.getTableRegionInfoByTableID(table_id);
}

} // namespace DB
9 changes: 5 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <Common/LogWithPrefix.h>
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/TablesRegionsInfo.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Storages/Transaction/TiDB.h>

Expand Down Expand Up @@ -199,8 +200,9 @@ class DAGContext

std::pair<bool, double> getTableScanThroughput();

const RegionInfoMap & getRegionsForLocalRead() const { return regions_for_local_read; }
const RegionInfoList & getRegionsForRemoteRead() const { return regions_for_remote_read; }
const SingleTableRegions & getTableRegionsInfoByTableID(Int64 table_id) const;

bool containsRegionsInfoForTable(Int64 table_id) const;

const BlockIO & getBlockIO() const
{
Expand Down Expand Up @@ -246,8 +248,7 @@ class DAGContext
bool is_root_mpp_task = false;
bool is_batch_cop = false;
MPPTunnelSetPtr tunnel_set;
RegionInfoMap regions_for_local_read;
RegionInfoList regions_for_remote_read;
TablesRegionsInfo tables_regions_info;
// part of regions_for_local_read + regions_for_remote_read, only used for batch-cop
RegionInfoList retry_regions;

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ try
if (!dag_context.retry_regions.empty())
{
coprocessor::BatchResponse response;
for (auto region : dag_context.retry_regions)
for (const auto & region : dag_context.retry_regions)
{
auto * retry_region = response.add_retry_regions();
retry_region->set_id(region.region_id);
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class Context;
bool isSourceNode(const tipb::Executor * root)
{
return root->tp() == tipb::ExecType::TypeJoin || root->tp() == tipb::ExecType::TypeTableScan
|| root->tp() == tipb::ExecType::TypeExchangeReceiver || root->tp() == tipb::ExecType::TypeProjection;
|| root->tp() == tipb::ExecType::TypeExchangeReceiver || root->tp() == tipb::ExecType::TypeProjection
|| root->tp() == tipb::ExecType::TypePartitionTableScan;
}

const static String SOURCE_NAME("source");
Expand Down Expand Up @@ -132,6 +133,10 @@ DAGQueryBlock::DAGQueryBlock(const tipb::Executor & root_, QueryBlockIDGenerator
{
GET_METRIC(tiflash_coprocessor_executor_count, type_ts).Increment();
}
else if (current->tp() == tipb::ExecType::TypePartitionTableScan)
{
GET_METRIC(tiflash_coprocessor_executor_count, type_partition_ts).Increment();
}
}

/// construct DAGQueryBlock from a list struct based executors, which is the
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class DAGQueryBlock
std::vector<Int32> output_offsets;

bool isRootQueryBlock() const { return id == 1; };
bool isTableScanSource() const { return source->tp() == tipb::ExecType::TypeTableScan || source->tp() == tipb::ExecType::TypePartitionTableScan; }
};

} // namespace DB
Loading

0 comments on commit 2b14421

Please sign in to comment.