Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#9456
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
windtalker authored and ti-chi-bot committed Sep 23, 2024
1 parent 62809fe commit b6067e0
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 0 deletions.
9 changes: 9 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,24 @@ void PhysicalJoin::probeSideTransform(DAGPipeline & probe_pipeline, Context & co
/// probe side streams
executeExpression(probe_pipeline, probe_side_prepare_actions, log, "append join key and join filters for probe side");
/// add join input stream
<<<<<<< HEAD
String join_probe_extra_info = fmt::format("join probe, join_executor_id = {}, scan_hash_map_after_probe = {}", execId(), needScanHashMapAfterProbe(join_ptr->getKind()));
join_ptr->initProbe(probe_pipeline.firstStream()->getHeader(),
probe_pipeline.streams.size());
=======
String join_probe_extra_info = fmt::format(
"join probe, join_executor_id = {}, scan_hash_map_after_probe = {}",
execId(),
needScanHashMapAfterProbe(join_ptr->getKind()));
join_ptr->initProbe(probe_pipeline.firstStream()->getHeader(), probe_pipeline.streams.size());
>>>>>>> 78bd3f04dc (fix tiflash assert failure (#9456))
size_t probe_index = 0;
for (auto & stream : probe_pipeline.streams)
{
stream = std::make_shared<HashJoinProbeBlockInputStream>(stream, join_ptr, probe_index++, log->identifier(), settings.max_block_size);
stream->setExtraInfo(join_probe_extra_info);
}
join_ptr->setCancellationHook([&] { return context.isCancelled(); });
}

void PhysicalJoin::buildSideTransform(DAGPipeline & build_pipeline, Context & context, size_t max_streams)
Expand Down
45 changes: 45 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalJoinBuild.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Executor/PipelineExecutorContext.h>
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Flash/Planner/Plans/PhysicalJoinBuild.h>
#include <Interpreters/Context.h>
#include <Operators/HashJoinBuildSink.h>

namespace DB
{
void PhysicalJoinBuild::buildPipelineExecGroupImpl(
PipelineExecutorContext & exec_context,
PipelineExecGroupBuilder & group_builder,
Context & context,
size_t /*concurrency*/)
{
executeExpression(exec_context, group_builder, prepare_actions, log);

size_t build_index = 0;
assert(join_ptr);
group_builder.transform([&](auto & builder) {
builder.setSinkOp(
std::make_unique<HashJoinBuildSink>(exec_context, log->identifier(), join_ptr, build_index++));
});
auto & join_execute_info = context.getDAGContext()->getJoinExecuteInfoMap()[execId()];
join_execute_info.join_build_profile_infos = group_builder.getCurProfileInfos();
join_ptr->initBuild(group_builder.getCurrentHeader(), group_builder.concurrency());
join_ptr->setInitActiveBuildThreads();
join_ptr.reset();
}
} // namespace DB
67 changes: 67 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalJoinProbe.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Executor/PipelineExecutorContext.h>
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Flash/Planner/Plans/PhysicalJoinProbe.h>
#include <Interpreters/Context.h>
#include <Operators/HashJoinProbeTransformOp.h>

namespace DB
{
void PhysicalJoinProbe::buildPipelineExecGroupImpl(
PipelineExecutorContext & exec_context,
PipelineExecGroupBuilder & group_builder,
Context & context,
size_t concurrency)
{
// Currently join probe does not support fine grained shuffle.
RUNTIME_CHECK(!fine_grained_shuffle.enabled());
if (join_ptr->isSpilled() && group_builder.concurrency() == 1)
{
// When the join build operator spilled, the probe operator requires at least two or more threads to restore spilled hash partitions.
auto restore_concurrency = std::max(2, concurrency);
restoreConcurrency(
exec_context,
group_builder,
restore_concurrency,
context.getSettingsRef().max_buffered_bytes_in_executor,
log);
}

executeExpression(exec_context, group_builder, prepare_actions, log);

auto input_header = group_builder.getCurrentHeader();
assert(join_ptr);
join_ptr->initProbe(input_header, group_builder.concurrency());
size_t probe_index = 0;
const auto & max_block_size = context.getSettingsRef().max_block_size;
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<HashJoinProbeTransformOp>(
exec_context,
log->identifier(),
join_ptr,
probe_index++,
max_block_size,
input_header));
});
// The `join_ptr->wait_build_finished_future` does not need to be added to exec_context here;
// it is only necessary to add it during the "restore build stage."
// The order of build/probe here is ensured by the event.
exec_context.addOneTimeFuture(join_ptr->wait_probe_finished_future);
join_ptr->setCancellationHook([&]() { return exec_context.isCancelled(); });
join_ptr.reset();
}
} // namespace DB
21 changes: 21 additions & 0 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,24 @@ Block Join::doJoinBlockHash(ProbeProcessInfo & probe_process_info) const
return block;
}

<<<<<<< HEAD
=======
Block Join::removeUselessColumn(Block & block) const
{
// cancelled
if (!block)
return block;

Block projected_block;
for (const auto & name_and_type : output_columns_after_finalize)
{
auto & column = block.getByName(name_and_type.name);
projected_block.insert(std::move(column));
}
return projected_block;
}

>>>>>>> 78bd3f04dc (fix tiflash assert failure (#9456))
Block Join::joinBlockHash(ProbeProcessInfo & probe_process_info) const
{
std::vector<Block> result_blocks;
Expand Down Expand Up @@ -1581,6 +1599,9 @@ Block Join::joinBlock(ProbeProcessInfo & probe_process_info, bool dry_run) const
else
block = joinBlockHash(probe_process_info);

// if cancelled, just return empty block
if (!block)
return block;
/// for (cartesian)antiLeftSemi join, the meaning of "match-helper" is `non-matched` instead of `matched`.
if (kind == LeftOuterAnti || kind == Cross_LeftOuterAnti)
{
Expand Down

0 comments on commit b6067e0

Please sign in to comment.