-
Notifications
You must be signed in to change notification settings - Fork 411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature Branch: merge master to planner_refactory
branch
#5353
Feature Branch: merge master to planner_refactory
branch
#5353
Conversation
* Add to_seconds support for tiflash Signed-off-by: yibin <[email protected]> * Fix format issue Signed-off-by: yibin <[email protected]> * Add mutex lock to protect async reciever async reader Signed-off-by: yibin <[email protected]> * Fix a rebase issue Signed-off-by: yibin <[email protected]> * Change raw pointer to unique ptr Signed-off-by: yibin <[email protected]> * Fix format issue Signed-off-by: yibin <[email protected]> Co-authored-by: Ti Chi Robot <[email protected]>
…gionKVStoreTest*'` (pingcap#4903) close pingcap#4904
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
planner_refactory
branch
/rebuild |
/// todo support fine grained shuffle | ||
static auto disable_fine_frained_shuffle = [](const DAGQueryBlock & query_block) { | ||
return !enableFineGrainedShuffle(query_block.source->fine_grained_shuffle_stream_count()) | ||
&& (!query_block.exchange_sender || !enableFineGrainedShuffle(query_block.exchange_sender->fine_grained_shuffle_stream_count())); | ||
}; | ||
return query_block.source | ||
&& (query_block.source->tp() == tipb::ExecType::TypeProjection | ||
|| query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver); | ||
|| query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver) | ||
&& disable_fine_frained_shuffle(query_block); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for fine grained shuffle
#5048
@@ -69,7 +68,7 @@ void PhysicalExchangeReceiver::transformImpl(DAGPipeline & pipeline, Context & c | |||
auto & exchange_receiver_io_input_streams = dag_context.getInBoundIOInputStreamsMap()[executor_id]; | |||
for (size_t i = 0; i < max_streams; ++i) | |||
{ | |||
BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(mpp_exchange_receiver, log->identifier(), executor_id); | |||
BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(mpp_exchange_receiver, log->identifier(), executor_id, /*stream_id=*/0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for fine grained shuffle
#5048
/// todo support fine grained shuffle | ||
int stream_id = 0; | ||
pipeline.transform([&](auto & stream) { | ||
// construct writer | ||
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<MPPTunnelSetPtr>>( | ||
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<MPPTunnelSetPtr, false>>( | ||
dag_context.tunnel_set, | ||
partition_col_ids, | ||
partition_col_collators, | ||
exchange_type, | ||
context.getSettingsRef().dag_records_per_chunk, | ||
context.getSettingsRef().batch_send_min_limit, | ||
stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response | ||
dag_context); | ||
dag_context, | ||
0, | ||
0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for fine grained shuffle
#5048
} | ||
CATCH | ||
|
||
/// todo support FineGrainedShuffle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for fine grained shuffle
#5048
if (pipeline.streams.size() > 1 || pipeline.streams_with_non_joined_data.size() > 1) | ||
{ | ||
const Settings & settings = context.getSettingsRef(); | ||
BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log); | ||
pipeline.firstStream() = std::make_shared<ParallelAggregatingBlockInputStream>( | ||
BlockInputStreamPtr stream = std::make_shared<ParallelAggregatingBlockInputStream>( | ||
pipeline.streams, | ||
stream_with_non_joined_data, | ||
pipeline.streams_with_non_joined_data, | ||
params, | ||
context.getFileProvider(), | ||
true, | ||
max_streams, | ||
settings.aggregation_memory_efficient_merge_threads ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads) : static_cast<size_t>(settings.max_threads), | ||
log->identifier()); | ||
|
||
pipeline.streams.resize(1); | ||
pipeline.streams_with_non_joined_data.clear(); | ||
pipeline.firstStream() = std::move(stream); | ||
|
||
// should record for agg before restore concurrency. See #3804. | ||
recordProfileStreams(pipeline, context); | ||
restoreConcurrency(pipeline, context.getDAGContext()->final_concurrency, log); | ||
} | ||
else | ||
{ | ||
BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log); | ||
BlockInputStreams inputs; | ||
if (!pipeline.streams.empty()) | ||
inputs.push_back(pipeline.firstStream()); | ||
else | ||
pipeline.streams.resize(1); | ||
if (stream_with_non_joined_data) | ||
inputs.push_back(stream_with_non_joined_data); | ||
|
||
if (!pipeline.streams_with_non_joined_data.empty()) | ||
inputs.push_back(pipeline.streams_with_non_joined_data.at(0)); | ||
|
||
pipeline.streams.resize(1); | ||
pipeline.streams_with_non_joined_data.clear(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from #5274
/rebuild |
/merge |
@SeaRise: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This pull request has been accepted and is ready to merge. Commit hash: b8a6178
|
/rebuild |
/run-integration-test |
Coverage for changed files
Coverage summary
full coverage report (for internal network access only) |
/run-integration-test |
6 similar comments
/run-integration-test |
/run-integration-test |
/run-integration-test |
/run-integration-test |
/run-integration-test |
/run-integration-test |
What problem does this PR solve?
Issue Number: ref #4739
close #5350
Problem Summary:
What is changed and how it works?
We can use #5350 to check if the modification is correct
Check List
Tests
Side effects
Documentation
Release note