Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine GetResultSink and use QueryExecutor for cop/batchCop #6830

48 changes: 0 additions & 48 deletions dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h

This file was deleted.

102 changes: 102 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,98 @@ bool strictSqlMode(UInt64 sql_mode)
return sql_mode & TiDBSQLMode::STRICT_ALL_TABLES || sql_mode & TiDBSQLMode::STRICT_TRANS_TABLES;
}

// for non-mpp(cop/batchCop)
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, tidb_host(tidb_host_)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, is_mpp_task(false)
, is_root_mpp_task(false)
, is_batch_cop(is_batch_cop_)
, tables_regions_info(std::move(tables_regions_info_))
, log(std::move(log_))
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
{
RUNTIME_CHECK((dag_request->executors_size() > 0) != dag_request->has_root_executor());
const auto & root_executor = dag_request->has_root_executor()
? dag_request->root_executor()
: dag_request->executors(dag_request->executors_size() - 1);
return_executor_id = root_executor.has_executor_id();
if (return_executor_id)
root_executor_id = root_executor.executor_id();
initOutputInfo();
}

// for mpp
DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, return_executor_id(true)
, is_mpp_task(true)
, is_root_mpp_task(is_root_mpp_task_)
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, mpp_task_meta(meta_)
, mpp_task_id(mpp_task_meta)
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
{
RUNTIME_CHECK(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id());
root_executor_id = dag_request->root_executor().executor_id();
// only mpp task has join executor.
initExecutorIdToJoinIdMap();
initOutputInfo();
}

// for test
DAGContext::DAGContext(UInt64 max_error_count_)
: dag_request(nullptr)
, dummy_ast(makeDummyQuery())
, collect_execution_summaries(false)
, is_mpp_task(false)
, is_root_mpp_task(false)
, flags(0)
, sql_mode(0)
, max_recorded_error_count(max_error_count_)
, warnings(max_recorded_error_count)
, warning_count(0)
{}

// for tests need to run query tasks.
DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, initialize_concurrency(concurrency)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, is_mpp_task(false)
, is_root_mpp_task(false)
, log(Logger::get(log_identifier))
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
{
RUNTIME_CHECK((dag_request->executors_size() > 0) != dag_request->has_root_executor());
const auto & root_executor = dag_request->has_root_executor()
? dag_request->root_executor()
: dag_request->executors(dag_request->executors_size() - 1);
return_executor_id = root_executor.has_executor_id();
if (return_executor_id)
root_executor_id = root_executor.executor_id();
initOutputInfo();
}

void DAGContext::initOutputInfo()
{
output_field_types = collectOutputFieldTypes(*dag_request);
Expand All @@ -53,6 +145,16 @@ void DAGContext::initOutputInfo()
keep_session_timezone_info = encode_type == tipb::EncodeType::TypeChunk || encode_type == tipb::EncodeType::TypeCHBlock;
}

String DAGContext::getRootExecutorId()
{
// If return_executor_id is false, we can get the generated executor_id from list_based_executors_order.
return return_executor_id
? root_executor_id
: (list_based_executors_order.empty()
? ""
: list_based_executors_order.back());
}

bool DAGContext::allowZeroInDate() const
{
return flags & TiDBSQLFlags::IGNORE_ZERO_IN_DATE;
Expand Down
82 changes: 7 additions & 75 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,87 +127,16 @@ class DAGContext
{
public:
// for non-mpp(cop/batchCop)
explicit DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, tidb_host(tidb_host_)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, is_mpp_task(false)
, is_root_mpp_task(false)
, is_batch_cop(is_batch_cop_)
, tables_regions_info(std::move(tables_regions_info_))
, log(std::move(log_))
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
{
assert(dag_request->has_root_executor() || dag_request->executors_size() > 0);
return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id();

initOutputInfo();
}
DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_);

// for mpp
DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, return_executor_id(true)
, is_mpp_task(true)
, is_root_mpp_task(is_root_mpp_task_)
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, mpp_task_meta(meta_)
, mpp_task_id(mpp_task_meta)
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
{
assert(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id());
// only mpp task has join executor.
initExecutorIdToJoinIdMap();
initOutputInfo();
}
DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_);

// for test
explicit DAGContext(UInt64 max_error_count_)
: dag_request(nullptr)
, dummy_ast(makeDummyQuery())
, collect_execution_summaries(false)
, is_mpp_task(false)
, is_root_mpp_task(false)
, flags(0)
, sql_mode(0)
, max_recorded_error_count(max_error_count_)
, warnings(max_recorded_error_count)
, warning_count(0)
{}
explicit DAGContext(UInt64 max_error_count_);

// for tests need to run query tasks.
explicit DAGContext(const tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, initialize_concurrency(concurrency)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, is_mpp_task(false)
, is_root_mpp_task(false)
, log(Logger::get(log_identifier))
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
{
assert(dag_request->has_root_executor() || dag_request->executors_size() > 0);
return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id();

initOutputInfo();
}
DAGContext(const tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency);

std::unordered_map<String, BlockInputStreams> & getProfileStreamsMap();

Expand Down Expand Up @@ -335,6 +264,8 @@ class DAGContext

void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }

String getRootExecutorId();

const tipb::DAGRequest * dag_request;
/// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast,
/// dummy_query_string and dummy_ast is used for that
Expand All @@ -350,6 +281,7 @@ class DAGContext
String tidb_host = "Unknown";
bool collect_execution_summaries{};
bool return_executor_id{};
String root_executor_id = "";
/* const */ bool is_mpp_task = false;
/* const */ bool is_root_mpp_task = false;
/* const */ bool is_batch_cop = false;
Expand Down
53 changes: 21 additions & 32 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
#include <DataStreams/BlockIO.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/ExecutionSummaryCollector.h>
#include <Flash/Coprocessor/StreamWriter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Coprocessor/UnaryDAGResponseWriter.h>
#include <Flash/Executor/toRU.h>
#include <Flash/executeQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
Expand Down Expand Up @@ -93,10 +91,9 @@ try
auto start_time = Clock::now();
DAGContext & dag_context = *context.getDAGContext();

// TODO use query executor for cop/batch cop.
BlockIO streams = executeAsBlockIO(context, internal);
if (!streams.in || streams.out)
// Only query is allowed, so streams.in must not be null and streams.out must be null
auto query_executor = queryExecute(context, internal);
if (!query_executor)
// Only query is allowed, so query_executor must not be null
throw TiFlashException("DAG is not query.", Errors::Coprocessor::Internal);

auto end_time = Clock::now();
Expand All @@ -107,12 +104,14 @@ try
BlockOutputStreamPtr dag_output_stream = nullptr;
if constexpr (!batch)
{
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<UnaryDAGResponseWriter>(
auto response_writer = std::make_unique<UnaryDAGResponseWriter>(
dag_response,
context.getSettingsRef().dag_records_per_chunk,
dag_context);
dag_output_stream = std::make_shared<DAGBlockOutputStream>(streams.in->getHeader(), std::move(response_writer));
copyData(*streams.in, *dag_output_stream);
response_writer->prepare(query_executor->getSampleBlock());
query_executor->execute([&response_writer](const Block & block) { response_writer->write(block); }).verify();
response_writer->flush();

if (dag_context.collect_execution_summaries)
{
ExecutionSummaryCollector summary_collector(dag_context);
Expand All @@ -136,14 +135,15 @@ try

auto streaming_writer = std::make_shared<StreamWriter>(writer);
TiDB::TiDBCollators collators;

std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<StreamWriterPtr>>(
auto response_writer = std::make_unique<StreamingDAGResponseWriter<StreamWriterPtr>>(
streaming_writer,
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
dag_context);
dag_output_stream = std::make_shared<DAGBlockOutputStream>(streams.in->getHeader(), std::move(response_writer));
copyData(*streams.in, *dag_output_stream);
response_writer->prepare(query_executor->getSampleBlock());
query_executor->execute([&response_writer](const Block & block) { response_writer->write(block); }).verify();
response_writer->flush();

if (dag_context.collect_execution_summaries)
{
ExecutionSummaryCollector summary_collector(dag_context);
Expand All @@ -152,7 +152,7 @@ try
}
}

auto ru = toRU(streams.in->estimateCPUTimeNs());
auto ru = query_executor->collectRequestUnit();
if constexpr (!batch)
{
LOG_INFO(log, "cop finish with request unit: {}", ru);
Expand Down Expand Up @@ -181,24 +181,13 @@ try
}
}

if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(streams.in.get()))
{
LOG_DEBUG(
log,
"dag request without encode cost: {} seconds, produce {} rows, {} bytes.",
p_stream->getProfileInfo().execution_time / (double)1000000000,
p_stream->getProfileInfo().rows,
p_stream->getProfileInfo().bytes);

if constexpr (!batch)
{
// Under some test cases, there may be dag response whose size is bigger than INT_MAX, and GRPC can not limit it.
// Throw exception to prevent receiver from getting wrong response.
if (accurate::greaterOp(p_stream->getProfileInfo().bytes, std::numeric_limits<int>::max()))
throw TiFlashException("DAG response is too big, please check config about region size or region merge scheduler",
Errors::Coprocessor::Internal);
}
Comment on lines -193 to -200
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has moved to UnaryDAGResponseWriter

}
auto runtime_statistics = query_executor->getRuntimeStatistics();
LOG_DEBUG(
log,
"dag request without encode cost: {} seconds, produce {} rows, {} bytes.",
runtime_statistics.execution_time_ns / static_cast<double>(1000000000),
runtime_statistics.rows,
runtime_statistics.bytes);
}
catch (const RegionException & e)
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ DAGResponseWriter::DAGResponseWriter(
&& dag_context.encode_type != tipb::EncodeType::TypeDefault)
{
throw TiFlashException(
"Only Default/Arrow/CHBlock encode type is supported in DAGBlockOutputStream.",
"Only Default/Arrow/CHBlock encode type is supported in DAGResponseWriter.",
Errors::Coprocessor::Unimplemented);
}
}
Expand Down
Loading