Skip to content

Commit

Permalink
Interpreter: Run interpreter test without exchange executors (#4788)
Browse files Browse the repository at this point in the history
ref #4609
  • Loading branch information
ywqzzy authored May 10, 2022
1 parent 0e2b7f7 commit f601de1
Show file tree
Hide file tree
Showing 15 changed files with 445 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <TestUtils/MockTableScanBlockInputStream.h>
#include <DataStreams/MockTableScanBlockInputStream.h>

namespace DB
{
Expand All @@ -32,7 +32,7 @@ MockTableScanBlockInputStream::MockTableScanBlockInputStream(ColumnsWithTypeAndN
}
}

ColumnPtr MockTableScanBlockInputStream::makeColumn(ColumnWithTypeAndName elem)
ColumnPtr MockTableScanBlockInputStream::makeColumn(ColumnWithTypeAndName elem) const
{
auto column = elem.type->createColumn();
size_t row_count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MockTableScanBlockInputStream : public IProfilingBlockInputStream

protected:
Block readImpl() override;
ColumnPtr makeColumn(ColumnWithTypeAndName elem);
ColumnPtr makeColumn(ColumnWithTypeAndName elem) const;
};

} // namespace DB
27 changes: 27 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,30 @@ class DAGContext
, max_recorded_error_count(max_error_count_)
, warnings(max_recorded_error_count)
, warning_count(0)
, is_test(true)
{}

// for tests need to run query tasks.
explicit DAGContext(const tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency)
: dag_request(&dag_request_)
, initialize_concurrency(concurrency)
, is_mpp_task(false)
, is_root_mpp_task(false)
, tunnel_set(nullptr)
, log(Logger::get(log_identifier))
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
, is_test(true)
{
assert(dag_request->has_root_executor() || dag_request->executors_size() > 0);
return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id();

initOutputInfo();
}

void attachBlockIO(const BlockIO & io_);
std::unordered_map<String, BlockInputStreams> & getProfileStreamsMap();

Expand Down Expand Up @@ -275,6 +297,8 @@ class DAGContext
return sql_mode & f;
}

bool isTest() const { return is_test; }

void cancelAllExchangeReceiver();

void initExchangeReceiverIfMPP(Context & context, size_t max_streams);
Expand All @@ -287,6 +311,7 @@ class DAGContext
const tipb::DAGRequest * dag_request;
Int64 compile_time_ns = 0;
size_t final_concurrency = 1;
size_t initialize_concurrency = 1;
bool has_read_wait_index = false;
Clock::time_point read_wait_index_start_timestamp{Clock::duration::zero()};
Clock::time_point read_wait_index_end_timestamp{Clock::duration::zero()};
Expand Down Expand Up @@ -345,6 +370,8 @@ class DAGContext
/// vector of SubqueriesForSets(such as join build subquery).
/// The order of the vector is also the order of the subquery.
std::vector<SubqueriesForSets> subqueries;

bool is_test = false; /// switch for test, do not use it in production.
};

} // namespace DB
28 changes: 22 additions & 6 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <DataStreams/HashJoinProbeBlockInputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/MockTableScanBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/ParallelAggregatingBlockInputStream.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
Expand All @@ -37,6 +38,7 @@
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/ExchangeSenderInterpreterHelper.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
Expand All @@ -46,6 +48,7 @@
#include <Interpreters/Join.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Storages/Transaction/TiDB.h>
#include <WindowFunctions/WindowFunctionFactory.h>

namespace DB
Expand Down Expand Up @@ -96,7 +99,8 @@ AnalysisResult analyzeExpressions(
AnalysisResult res;
ExpressionActionsChain chain;
// selection on table scan had been executed in handleTableScan
if (query_block.selection && !query_block.isTableScanSource())
// In test mode, filter is not pushed down to table scan
if (query_block.selection && (!query_block.isTableScanSource() || context.getDAGContext()->isTest()))
{
std::vector<const tipb::Expr *> where_conditions;
for (const auto & c : query_block.selection->selection().conditions())
Expand Down Expand Up @@ -153,6 +157,19 @@ AnalysisResult analyzeExpressions(
}
} // namespace

// for tests, we need to mock tableScan blockInputStream as the source stream.
void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline)
{
auto names_and_types = genNamesAndTypes(table_scan);
auto columns_with_type_and_name = getColumnWithTypeAndName(names_and_types);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
for (size_t i = 0; i < max_streams; ++i)
{
auto mock_table_scan_stream = std::make_shared<MockTableScanBlockInputStream>(columns_with_type_and_name, context.getSettingsRef().max_block_size);
pipeline.streams.emplace_back(mock_table_scan_stream);
}
}

void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline)
{
const auto push_down_filter = PushDownFilter::toPushDownFilter(query_block.selection);
Expand Down Expand Up @@ -752,7 +769,10 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
else if (query_block.isTableScanSource())
{
TiDBTableScan table_scan(query_block.source, query_block.source_name, dagContext());
handleTableScan(table_scan, pipeline);
if (dagContext().isTest())
handleMockTableScan(table_scan, pipeline);
else
handleTableScan(table_scan, pipeline);
dagContext().table_scan_executor_id = query_block.source_name;
}
else if (query_block.source->tp() == tipb::ExecType::TypeWindow)
Expand Down Expand Up @@ -799,14 +819,12 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
// execute aggregation
executeAggregation(pipeline, res.before_aggregation, res.aggregation_keys, res.aggregation_collators, res.aggregate_descriptions, res.is_final_agg);
}

if (res.before_having)
{
// execute having
executeWhere(pipeline, res.before_having, res.having_column_name);
recordProfileStreams(pipeline, query_block.having_name);
}

if (res.before_order_and_select)
{
executeExpression(pipeline, res.before_order_and_select);
Expand All @@ -821,14 +839,12 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)

// execute final project action
executeProject(pipeline, final_project);

// execute limit
if (query_block.limit_or_topn && query_block.limit_or_topn->tp() == tipb::TypeLimit)
{
executeLimit(pipeline);
recordProfileStreams(pipeline, query_block.limit_or_topn_name);
}

restorePipelineConcurrency(pipeline);

// execute exchange_sender
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/DAGStorageInterpreter.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
Expand Down Expand Up @@ -57,6 +58,7 @@ class DAGQueryBlockInterpreter
private:
#endif
void executeImpl(DAGPipeline & pipeline);
void handleMockTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline);
void handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline);
void handleJoin(const tipb::Join & join, DAGPipeline & pipeline, SubqueryForSet & right_query);
void prepareJoin(
Expand Down
56 changes: 56 additions & 0 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Storages/MutableSupport.h>

namespace DB
{
NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan)
{
NamesAndTypes names_and_types;
names_and_types.reserve(table_scan.getColumnSize());
for (Int32 i = 0; i < table_scan.getColumnSize(); ++i)
{
TiDB::ColumnInfo column_info;
const auto & ci = table_scan.getColumns()[i];
column_info.tp = static_cast<TiDB::TP>(ci.tp());
column_info.id = ci.column_id();

switch (column_info.id)
{
case TiDBPkColumnID:
// TODO: need to check if the type of pk_handle_columns matches the type that used in delta merge tree.
names_and_types.emplace_back(MutableSupport::tidb_pk_column_name, getDataTypeByColumnInfoForComputingLayer(column_info));
break;
case ExtraTableIDColumnID:
names_and_types.emplace_back(MutableSupport::extra_table_id_column_name, MutableSupport::extra_table_id_column_type);
break;
default:
names_and_types.emplace_back(fmt::format("mock_table_scan_{}", i), getDataTypeByColumnInfoForComputingLayer(column_info));
}
}
return names_and_types;
}

ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types)
{
std::vector<DB::ColumnWithTypeAndName> column_with_type_and_names;
column_with_type_and_names.reserve(names_and_types.size());
for (const auto & col : names_and_types)
{
column_with_type_and_names.push_back(DB::ColumnWithTypeAndName(col.type, col.name));
}
return column_with_type_and_names;
}
} // namespace DB
26 changes: 26 additions & 0 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Core/ColumnsWithTypeAndName.h>
#include <Core/NamesAndTypes.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Storages/Transaction/TiDB.h>

namespace DB
{
NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan);
ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types);
} // namespace DB
5 changes: 3 additions & 2 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_)
const Settings & settings = context.getSettingsRef();
if (dagContext().isBatchCop() || dagContext().isMPPTask())
max_streams = settings.max_threads;
else if (dagContext().isTest())
max_streams = dagContext().initialize_concurrency;
else
max_streams = 1;

if (max_streams > 1)
{
max_streams *= settings.max_streams_to_max_threads_ratio;
Expand Down Expand Up @@ -79,7 +82,6 @@ BlockIO InterpreterDAG::execute()
BlockInputStreams streams = executeQueryBlock(*dag.getRootQueryBlock());
DAGPipeline pipeline;
pipeline.streams = streams;

/// add union to run in parallel if needed
if (dagContext().isMPPTask())
/// MPPTask do not need the returned blocks.
Expand All @@ -95,7 +97,6 @@ BlockIO InterpreterDAG::execute()
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
dagContext().log->identifier());
}

BlockIO res;
res.in = pipeline.firstStream();
return res;
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

#include <Flash/Coprocessor/DAGContext.h>

#include <vector>

namespace DB
{
/// TiDBTableScan is a wrap to hide the difference of `TableScan` and `PartitionTableScan`
Expand Down
Loading

0 comments on commit f601de1

Please sign in to comment.