Skip to content

Commit

Permalink
MPP: Fix bug when building disaggregated regions info of mpp dispatch…
Browse files Browse the repository at this point in the history
… task (#6512)

close #6513
  • Loading branch information
JaySon-Huang authored Dec 21, 2022
1 parent e88ddaa commit cd8e8c1
Show file tree
Hide file tree
Showing 49 changed files with 324 additions and 95 deletions.
1 change: 1 addition & 0 deletions dbms/src/DataStreams/ExchangeSenderBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/FailPoint.h>
#include <DataStreams/ExchangeSenderBlockInputStream.h>
#include <Flash/Coprocessor/DAGContext.h>

namespace DB
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Debug/dbgFuncCoprocessorUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Debug/dbgFuncCoprocessorUtils.h>
#include <Flash/Coprocessor/DAGContext.h>

namespace DB
{
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Debug/dbgFuncCoprocessorUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <Flash/Coprocessor/ArrowChunkCodec.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DefaultChunkCodec.h>
#include <Interpreters/Context.h>
#include <Interpreters/sortBlock.h>
Expand Down
8 changes: 2 additions & 6 deletions dbms/src/Debug/dbgQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,7 @@ tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest

table_regions_info.local_regions.emplace(region_id, RegionInfo(region_id, region_version, region_conf_version, std::move(key_ranges), nullptr));

DAGContext dag_context(dag_request);
dag_context.tables_regions_info = std::move(tables_regions_info);
dag_context.log = log;
DAGContext dag_context(dag_request, std::move(tables_regions_info), "", false, log);
context.setDAGContext(&dag_context);

DAGDriver driver(context, start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true);
Expand Down Expand Up @@ -330,9 +328,7 @@ bool runAndCompareDagReq(const coprocessor::Request & req, const coprocessor::Re
auto & table_regions_info = tables_regions_info.getSingleTableRegions();
table_regions_info.local_regions.emplace(region_id, RegionInfo(region_id, region->version(), region->confVer(), std::move(key_ranges), nullptr));

DAGContext dag_context(dag_request);
dag_context.tables_regions_info = std::move(tables_regions_info);
dag_context.log = log;
DAGContext dag_context(dag_request, std::move(tables_regions_info), "", false, log);
context.setDAGContext(&dag_context);
DAGDriver driver(context, properties.start_ts, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, &dag_response, true);
driver.execute();
Expand Down
12 changes: 7 additions & 5 deletions dbms/src/Flash/BatchCoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/TiFlashMetrics.h>
#include <Flash/BatchCoprocessorHandler.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Flash/ServiceUtils.h>
Expand Down Expand Up @@ -68,11 +69,12 @@ grpc::Status BatchCoprocessorHandler::execute()
tables_regions_info.tableCount(),
dag_request.DebugString());

DAGContext dag_context(dag_request);
dag_context.is_batch_cop = true;
dag_context.tables_regions_info = std::move(tables_regions_info);
dag_context.log = Logger::get("BatchCoprocessorHandler");
dag_context.tidb_host = cop_context.db_context.getClientInfo().current_address.toString();
DAGContext dag_context(
dag_request,
std::move(tables_regions_info),
cop_context.db_context.getClientInfo().current_address.toString(),
/*is_batch_cop=*/true,
Logger::get("BatchCoprocessorHandler"));
cop_context.db_context.setDAGContext(&dag_context);

DAGDriver<true> driver(cop_context.db_context, cop_request->start_ts() > 0 ? cop_request->start_ts() : dag_request.start_ts_fallback(), cop_request->schema_ver(), writer);
Expand Down
12 changes: 8 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,17 @@ class DAGContext
{
public:
// for non-mpp(cop/batchCop)
explicit DAGContext(const tipb::DAGRequest & dag_request_)
explicit DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, tidb_host(tidb_host_)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, is_mpp_task(false)
, is_root_mpp_task(false)
, is_batch_cop(is_batch_cop_)
, tables_regions_info(std::move(tables_regions_info_))
, log(std::move(log_))
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, max_recorded_error_count(getMaxErrorCount(*dag_request))
Expand Down Expand Up @@ -345,9 +349,9 @@ class DAGContext
String tidb_host = "Unknown";
bool collect_execution_summaries{};
bool return_executor_id{};
bool is_mpp_task = false;
bool is_root_mpp_task = false;
bool is_batch_cop = false;
/* const */ bool is_mpp_task = false;
/* const */ bool is_root_mpp_task = false;
/* const */ bool is_batch_cop = false;
// `tunnel_set` is always set by `MPPTask` and is intended to be used for `DAGQueryBlockInterpreter`.
MPPTunnelSetPtr tunnel_set;
TablesRegionsInfo tables_regions_info;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/ExecutionSummaryCollector.h>
#include <Flash/Coprocessor/StreamWriter.h>
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <tipb/executor.pb.h>
#pragma GCC diagnostic pop

#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGQueryBlock.h>
#include <Flash/Coprocessor/DAGSet.h>
#include <Flash/Coprocessor/DAGUtils.h>
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <tipb/select.pb.h>
#pragma GCC diagnostic pop

#include <Flash/Coprocessor/DAGContext.h>
#include <Interpreters/IQuerySource.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/TiKVKeyValue.h>
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Flash/Coprocessor/collectOutputFieldTypes.h>
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQuerySource.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@

#pragma once

#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGQueryBlock.h>
#include <Interpreters/Context.h>
#include <Interpreters/IQuerySource.h>

namespace DB
{
class DAGContext;

/// DAGQuerySource is an adaptor between DAG and CH's executeQuery.
/// TODO: consider to directly use DAGContext instead.
class DAGQuerySource : public IQuerySource
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGResponseWriter.h>

namespace DB
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

#pragma once

#include <Flash/Coprocessor/DAGContext.h>
#include <common/types.h>
#include <tipb/select.pb.h>

namespace DB
{
class DAGContext;

class DAGResponseWriter
{
public:
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/FineGrainedShuffle.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include <Flash/Coprocessor/DAGContext.h>
#include <common/types.h>
#include <tipb/executor.pb.h>

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/InterpreterDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#pragma GCC diagnostic pop

#include <DataStreams/BlockIO.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Interpreters/IInterpreter.h>

Expand All @@ -30,6 +29,7 @@ namespace DB
class Context;
class Region;
using RegionPtr = std::shared_ptr<Region>;
class DAGContext;

/** build ch plan from dag request: dag executors -> ch plan
*/
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Flash/Coprocessor/MockSourceStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <Core/NamesAndTypes.h>
#include <DataStreams/MockExchangeReceiverInputStream.h>
#include <DataStreams/MockTableScanBlockInputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Interpreters/Context.h>

namespace DB
Expand Down Expand Up @@ -69,4 +68,4 @@ std::pair<NamesAndTypes, std::vector<std::shared_ptr<SourceType>>> mockSourceStr
RUNTIME_ASSERT(start == rows, log, "mock source streams' total size must same as user input");
return {names_and_types, mock_source_streams};
}
} // namespace DB
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/RemoteRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/FmtUtils.h>
#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/RemoteRequest.h>
#include <Storages/MutableSupport.h>
#include <common/logger_useful.h>
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/RemoteRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/RegionInfo.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Storages/Transaction/TiDB.h>

Expand Down
78 changes: 78 additions & 0 deletions dbms/src/Flash/Coprocessor/RequestUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Exception.h>

#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
#pragma clang diagnostic ignored "-Wunused-parameter"
#endif
#include <pingcap/coprocessor/Client.h>
#ifdef __clang__
#pragma clang diagnostic pop
#endif

namespace DB::RequestUtils
{
template <typename PbRegion>
void setUpRegion(const pingcap::coprocessor::RegionInfo & region_info, PbRegion * region)
{
region->set_region_id(region_info.region_id.id);
region->mutable_region_epoch()->set_version(region_info.region_id.ver);
region->mutable_region_epoch()->set_conf_ver(region_info.region_id.conf_ver);
for (const auto & key_range : region_info.ranges)
{
key_range.setKeyRange(region->add_ranges());
}
}

template <typename RequestPtr>
std::vector<pingcap::kv::RegionVerID>
setUpRegionInfos(const pingcap::coprocessor::BatchCopTask & batch_cop_task, const RequestPtr & req)
{
RUNTIME_CHECK_MSG(batch_cop_task.region_infos.empty() != batch_cop_task.table_regions.empty(),
"region_infos and table_regions should not exist at the same time, single table region info: {}, partition table region info: {}",
batch_cop_task.region_infos.size(),
batch_cop_task.table_regions.size());

std::vector<pingcap::kv::RegionVerID> region_ids;
if (!batch_cop_task.region_infos.empty())
{
// For non-partition table
region_ids.reserve(batch_cop_task.region_infos.size());
for (const auto & region_info : batch_cop_task.region_infos)
{
region_ids.push_back(region_info.region_id);
setUpRegion(region_info, req->add_regions());
}
return region_ids;
}
// For partition table
for (const auto & table_region : batch_cop_task.table_regions)
{
auto * req_table_region = req->add_table_regions();
req_table_region->set_physical_table_id(table_region.physical_table_id);
for (const auto & region_info : table_region.region_infos)
{
region_ids.push_back(region_info.region_id);
setUpRegion(region_info, req_table_region->add_regions());
}
}
return region_ids;
}

} // namespace DB::RequestUtils
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Common/TiFlashException.h>
#include <Flash/Coprocessor/ArrowChunkCodec.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DefaultChunkCodec.h>
#include <Flash/Coprocessor/StreamWriter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
#include <Core/Types.h>
#include <DataTypes/IDataType.h>
#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGResponseWriter.h>
#include <Flash/Mpp/TrackedMppDataPacket.h>
#include <common/logger_useful.h>

namespace DB
{
class DAGContext;

/// Serializes the stream of blocks and sends them to TiDB/TiSpark with different serialization paths.
template <class StreamWriterPtr>
class StreamingDAGResponseWriter : public DAGResponseWriter
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/TiDBTableScan.h>

namespace DB
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

#pragma once

#include <Flash/Coprocessor/DAGContext.h>
#include <Storages/Transaction/TypeMapping.h>
#include <common/types.h>
#include <tipb/executor.pb.h>

namespace DB
{

class DAGContext;
using ColumnInfos = std::vector<TiDB::ColumnInfo>;

/// TiDBTableScan is a wrap to hide the difference of `TableScan` and `PartitionTableScan`
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/UnaryDAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Flash/Coprocessor/ArrowChunkCodec.h>
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DefaultChunkCodec.h>
#include <Flash/Coprocessor/ExecutionSummaryCollector.h>
#include <Flash/Coprocessor/UnaryDAGResponseWriter.h>
Expand Down
Loading

0 comments on commit cd8e8c1

Please sign in to comment.