Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: support fine grained shuffle #6934

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3e5c3fb
update partA
SeaRise Mar 2, 2023
60fcb90
part2
SeaRise Mar 2, 2023
884aefd
udpate
SeaRise Mar 2, 2023
d2520b6
fix comment
SeaRise Mar 2, 2023
c1362bd
add ut
SeaRise Mar 2, 2023
8aabbe8
fix
SeaRise Mar 2, 2023
700e04a
fix comment
SeaRise Mar 2, 2023
f704069
fix
SeaRise Mar 3, 2023
6035999
update
SeaRise Mar 3, 2023
63ca1c0
update
SeaRise Mar 3, 2023
e1220b1
Merge branch 'master' into support_fine_grained_shuffle_for_pipeline
SeaRise Mar 3, 2023
4b7f61d
merge master
SeaRise Mar 3, 2023
06c8004
add fullstack test
SeaRise Mar 3, 2023
f6b9936
update
SeaRise Mar 3, 2023
88b64c8
fix comment
SeaRise Mar 3, 2023
688cc41
add more comments
SeaRise Mar 6, 2023
6aef338
Merge branch 'master' into support_fine_grained_shuffle_for_pipeline
SeaRise Mar 7, 2023
d7effec
merge master and fix
SeaRise Mar 7, 2023
57a172c
Merge branch 'master' into support_fine_grained_shuffle_for_pipeline
SeaRise Mar 13, 2023
d86bfeb
replace 1024 with maxFineGrainedStreamCount
SeaRise Mar 13, 2023
dac6717
refine
SeaRise Mar 14, 2023
c1cba4e
u
SeaRise Mar 14, 2023
cd049c9
replace magic number
SeaRise Mar 14, 2023
15356ab
Merge branch 'master' into support_fine_grained_shuffle_for_pipeline
SeaRise Mar 14, 2023
08c3138
Update Pipeline.cpp
SeaRise Mar 14, 2023
0c891de
add more unit tests
SeaRise Mar 14, 2023
c65200c
Update ExecutorTestUtils.cpp
SeaRise Mar 14, 2023
4ae0cc5
address comment
SeaRise Mar 15, 2023
6f02e0e
Merge branch 'master' into support_fine_grained_shuffle_for_pipeline
ti-chi-bot Mar 20, 2023
3906d8b
Merge branch 'master' into support_fine_grained_shuffle_for_pipeline
ti-chi-bot Mar 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,43 @@ tipb::ExchangeType ExchangeSenderBinder::getType() const
return type;
}

ExecutorBinderPtr compileExchangeSender(ExecutorBinderPtr input, size_t & executor_index, tipb::ExchangeType exchange_type)
ExecutorBinderPtr compileExchangeSender(
ExecutorBinderPtr input,
size_t & executor_index,
tipb::ExchangeType exchange_type,
ASTPtr partition_key_list,
uint64_t fine_grained_shuffle_stream_count)
{
ExecutorBinderPtr exchange_sender = std::make_shared<mock::ExchangeSenderBinder>(executor_index, input->output_schema, exchange_type);
std::vector<size_t> partition_key_indexes;
for (const auto & partition_key : partition_key_list->children)
{
size_t schema_index = 0;
for (; schema_index < input->output_schema.size(); ++schema_index)
{
if (input->output_schema[schema_index].first == partition_key->getColumnName())
{
partition_key_indexes.push_back(schema_index);
break;
}
}
auto schema_string = [&]() {
FmtBuffer buffer;
buffer.joinStr(
input->output_schema.cbegin(),
input->output_schema.cend(),
[](const auto & item, FmtBuffer & buf) { buf.append(item.first); },
", ");
return buffer.toString();
};
if (schema_index == input->output_schema.size())
throw Exception(fmt::format("Unknown partition key: {}, schema is [{}]", partition_key->getColumnName(), schema_string()));
}
ExecutorBinderPtr exchange_sender = std::make_shared<mock::ExchangeSenderBinder>(
executor_index,
input->output_schema,
exchange_type,
partition_key_indexes,
fine_grained_shuffle_stream_count);
exchange_sender->children.push_back(input);
return exchange_sender;
}
Expand Down
14 changes: 12 additions & 2 deletions dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ namespace DB::mock
class ExchangeSenderBinder : public ExecutorBinder
{
public:
ExchangeSenderBinder(size_t & index, const DAGSchema & output, tipb::ExchangeType type_, const std::vector<size_t> & partition_keys_ = {}, uint64_t fine_grained_shuffle_stream_count_ = 0)
ExchangeSenderBinder(
size_t & index,
const DAGSchema & output,
tipb::ExchangeType type_,
const std::vector<size_t> & partition_keys_ = {},
uint64_t fine_grained_shuffle_stream_count_ = 0)
: ExecutorBinder(index, "exchange_sender_" + std::to_string(index), output)
, type(type_)
, partition_keys(partition_keys_)
Expand All @@ -42,5 +47,10 @@ class ExchangeSenderBinder : public ExecutorBinder
uint64_t fine_grained_shuffle_stream_count;
};

ExecutorBinderPtr compileExchangeSender(ExecutorBinderPtr input, size_t & executor_index, tipb::ExchangeType exchange_type);
ExecutorBinderPtr compileExchangeSender(
ExecutorBinderPtr input,
size_t & executor_index,
tipb::ExchangeType exchange_type,
ASTPtr partition_key_list = {},
uint64_t fine_grained_shuffle_stream_count = 0);
} // namespace DB::mock
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline)
{
extra_info = String(enableFineGrainedShuffleExtraInfo);
RUNTIME_CHECK(exchange_sender.tp() == tipb::ExchangeType::Hash, ExchangeType_Name(exchange_sender.tp()));
RUNTIME_CHECK(stream_count <= 1024, stream_count);
RUNTIME_CHECK(stream_count <= maxFineGrainedStreamCount, stream_count);
}
pipeline.transform([&](auto & stream) {
// construct writer
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Coprocessor/FineGrainedShuffle.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,20 @@ namespace DB
{
static constexpr std::string_view enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle";

static constexpr size_t maxFineGrainedStreamCount = 1024;

inline bool enableFineGrainedShuffle(uint64_t stream_count)
{
return stream_count > 0;
}

struct FineGrainedShuffle
{
FineGrainedShuffle()
: stream_count(0)
, batch_size(0)
{}

explicit FineGrainedShuffle(const tipb::Executor * executor)
: stream_count(executor ? executor->fine_grained_shuffle_stream_count() : 0)
, batch_size(executor ? executor->fine_grained_shuffle_batch_size() : 0)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Exec/PipelineExec.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace DB
// The executor for push model operator.
// A pipeline will generate multiple pipeline_execs.
// data flow: source --> transform --> .. --> transform --> sink
class PipelineExec
class PipelineExec : private boost::noncopyable
{
public:
PipelineExec(
Expand Down
8 changes: 0 additions & 8 deletions dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

namespace DB
{
class PipelineExecutorStatus;

struct PipelineExecBuilder
{
SourceOpPtr source_op;
Expand All @@ -41,12 +39,6 @@ struct PipelineExecGroupBuilder
using BuilderGroup = std::vector<PipelineExecBuilder>;
BuilderGroup group;

explicit PipelineExecGroupBuilder(PipelineExecutorStatus & exec_status_)
: exec_status(exec_status_)
{}

PipelineExecutorStatus & exec_status;

size_t concurrency = 0;

void init(size_t init_concurrency);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Pipeline/Exec/tests/gtest_simple_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ class SimpleOperatorTestRunner : public DB::tests::ExecutorTest
PhysicalPlan physical_plan{*context.context, ""};
physical_plan.build(request.get());
assert(!result_handler.isIgnored());
auto plan_tree = PhysicalGetResultSink::build(std::move(result_handler), physical_plan.outputAndOptimize());
auto plan_tree = PhysicalGetResultSink::build(std::move(result_handler), Logger::get(), physical_plan.outputAndOptimize());

PipelineExecGroupBuilder group_builder{exec_status};
PipelineExecGroupBuilder group_builder;
PhysicalPlanVisitor::visitPostOrder(plan_tree, [&](const PhysicalPlanNodePtr & plan) {
assert(plan);
plan->buildPipelineExec(group_builder, *context.context, /*concurrency=*/1);
plan->buildPipelineExecGroup(exec_status, group_builder, *context.context, /*concurrency=*/1);
});
auto result = group_builder.build();
assert(result.size() == 1);
Expand Down
173 changes: 138 additions & 35 deletions dbms/src/Flash/Pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Flash/Pipeline/Pipeline.h>
#include <Flash/Pipeline/Schedule/Events/Event.h>
#include <Flash/Pipeline/Schedule/Events/FineGrainedPipelineEvent.h>
#include <Flash/Pipeline/Schedule/Events/PlainPipelineEvent.h>
#include <Flash/Planner/PhysicalPlanNode.h>
#include <Flash/Planner/Plans/PhysicalGetResultSink.h>
Expand All @@ -34,6 +35,85 @@ FmtBuffer & addPrefix(FmtBuffer & buffer, size_t level)
}
} // namespace

PipelineEvents::PipelineEvents(Events && events_, bool is_fine_grained_)
: events(std::move(events_))
, is_fine_grained(is_fine_grained_)
{
RUNTIME_CHECK(!events.empty());
// For non fine grained mode, the size of events must be 1.
RUNTIME_CHECK(is_fine_grained || events.size() == 1);
}

void PipelineEvents::mapInputs(const PipelineEvents & inputs)
{
/// The self events is output.
if (inputs.is_fine_grained && is_fine_grained)
{
if (inputs.events.size() == events.size())
{
/**
* 1. If the number of partitions match, use fine grained mapping here.
* ```
* FineGrainedPipelineEvent◄────FineGrainedPipelineEvent
* FineGrainedPipelineEvent◄────FineGrainedPipelineEvent
* FineGrainedPipelineEvent◄────FineGrainedPipelineEvent
* FineGrainedPipelineEvent◄────FineGrainedPipelineEvent
* ```
*/
size_t partition_num = inputs.events.size();
for (size_t index = 0; index < partition_num; ++index)
events[index]->addInput(inputs.events[index]);
}
else
{
/**
* 2. If the number of partitions does not match, it is safer to use full mapping.
* ```
* FineGrainedPipelineEvent◄──┐ ┌──FineGrainedPipelineEvent
* FineGrainedPipelineEvent◄──┼─┼──FineGrainedPipelineEvent
* FineGrainedPipelineEvent◄──┤ └──FineGrainedPipelineEvent
* FineGrainedPipelineEvent◄──┘
* ```
*/
for (const auto & output : events)
{
for (const auto & input : inputs.events)
output->addInput(input);
}
}
}
else
{
/**
* Use full mapping here.
* 1. for non fine grained inputs and non fine grained outputs
* The size of inputs and outputs must be the same and 1.
* ```
* PlainPipelineEvent◄────PlainPipelineEvent
* ```
* 2. for non fine grained inputs and fine grained outputs
* This is not possible, if fine-grained is enabled in outputs, then inputs must also be enabled.
* 3. for fine grained inputs and non fine grained outputs
* ```
* ┌──FineGrainedPipelineEvent
* PlainPipelineEvent◄──┼──FineGrainedPipelineEvent
* ├──FineGrainedPipelineEvent
* └──FineGrainedPipelineEvent
*
* PlainPipelineEvent◄────FineGrainedPipelineEvent
* ```
*/

// If the outputs is fine grained model, the intputs must also be.
RUNTIME_CHECK(inputs.is_fine_grained || !is_fine_grained);
for (const auto & output : events)
{
for (const auto & input : inputs.events)
output->addInput(input);
}
}
}

void Pipeline::addPlanNode(const PhysicalPlanNodePtr & plan_node)
{
plan_nodes.push_back(plan_node);
Expand Down Expand Up @@ -76,50 +156,76 @@ void Pipeline::toTreeString(FmtBuffer & buffer, size_t level) const
void Pipeline::addGetResultSink(ResultHandler && result_handler)
{
assert(!plan_nodes.empty());
auto get_result_sink = PhysicalGetResultSink::build(std::move(result_handler), plan_nodes.back());
auto get_result_sink = PhysicalGetResultSink::build(std::move(result_handler), log, plan_nodes.back());
addPlanNode(get_result_sink);
}

PipelineExecGroup Pipeline::buildExecGroup(PipelineExecutorStatus & exec_status, Context & context, size_t concurrency)
{
assert(!plan_nodes.empty());
PipelineExecGroupBuilder builder{exec_status};
PipelineExecGroupBuilder builder;
for (const auto & plan_node : plan_nodes)
plan_node->buildPipelineExec(builder, context, concurrency);
plan_node->buildPipelineExecGroup(exec_status, builder, context, concurrency);
return builder.build();
}

/**
* There are two execution modes in pipeline.
* 1. non fine grained mode
* A pipeline generates an event(PlainPipelineEvent).
* This means that all the operators in the pipeline are finished before the next pipeline is triggered.
* 2. fine grained mode
* A pipeline will generate n Events(FineGrainedPipelineEvent), one for each data partition.
* There is a fine-grained mapping of Events between Pipelines, e.g. only Events from the same data partition will have dependencies on each other.
* This means that once some data partition of the previous pipeline has finished, the operators of the next pipeline's corresponding data partition can be started without having to wait for the entire pipeline to finish.
*
* ┌──non fine grained mode──┐ ┌──fine grained mode──┐
* ┌──FineGrainedPipelineEvent◄───FineGrainedPipelineEvent
* PlainPipelineEvent◄───PlainPipelineEvent◄──┼──FineGrainedPipelineEvent◄───FineGrainedPipelineEvent
* └──FineGrainedPipelineEvent◄───FineGrainedPipelineEvent
*/
bool Pipeline::isFineGrainedMode() const
{
assert(!plan_nodes.empty());
// The source plan node determines whether the execution mode is fine grained or non-fine grained.
return plan_nodes.front()->getFineGrainedShuffle().enable();
}

Events Pipeline::toEvents(PipelineExecutorStatus & status, Context & context, size_t concurrency)
{
Events all_events;
toEvent(status, context, concurrency, all_events);
doToEvents(status, context, concurrency, all_events);
assert(!all_events.empty());
return all_events;
}

EventPtr Pipeline::toEvent(PipelineExecutorStatus & status, Context & context, size_t concurrency, Events & all_events)
{
// TODO support fine grained shuffle
// - a fine grained partition maps to an event
// - the event flow will be
// ```
// disable fine grained partition pipeline enable fine grained partition pipeline enable fine grained partition pipeline
// ┌───────────────FineGrainedPipelineEvent<────────────────FineGrainedPipelineEvent
// PlainPipelineEvent<────────┼───────────────FineGrainedPipelineEvent<────────────────FineGrainedPipelineEvent
// ├───────────────FineGrainedPipelineEvent<────────────────FineGrainedPipelineEvent
// └───────────────FineGrainedPipelineEvent<────────────────FineGrainedPipelineEvent
// ```
PipelineEvents Pipeline::toSelfEvents(PipelineExecutorStatus & status, Context & context, size_t concurrency)
{
auto memory_tracker = current_memory_tracker ? current_memory_tracker->shared_from_this() : nullptr;

auto plain_pipeline_event = std::make_shared<PlainPipelineEvent>(status, memory_tracker, log->identifier(), context, shared_from_this(), concurrency);
for (const auto & child : children)
Events self_events;
assert(!plan_nodes.empty());
if (isFineGrainedMode())
{
auto input = child->toEvent(status, context, concurrency, all_events);
assert(input);
plain_pipeline_event->addInput(input);
auto fine_grained_exec_group = buildExecGroup(status, context, concurrency);
for (auto & pipeline_exec : fine_grained_exec_group)
self_events.push_back(std::make_shared<FineGrainedPipelineEvent>(status, memory_tracker, log->identifier(), std::move(pipeline_exec)));
LOG_DEBUG(log, "Execute in fine grained model and generate {} fine grained pipeline event", self_events.size());
}
all_events.push_back(plain_pipeline_event);
return plain_pipeline_event;
else
{
self_events.push_back(std::make_shared<PlainPipelineEvent>(status, memory_tracker, log->identifier(), context, shared_from_this(), concurrency));
LOG_DEBUG(log, "Execute in non fine grained model and generate one plain pipeline event");
}
return {std::move(self_events), isFineGrainedMode()};
}

PipelineEvents Pipeline::doToEvents(PipelineExecutorStatus & status, Context & context, size_t concurrency, Events & all_events)
{
auto self_events = toSelfEvents(status, context, concurrency);
for (const auto & child : children)
self_events.mapInputs(child->doToEvents(status, context, concurrency, all_events));
all_events.insert(all_events.end(), self_events.events.cbegin(), self_events.events.cend());
return self_events;
}

bool Pipeline::isSupported(const tipb::DAGRequest & dag_request)
Expand All @@ -128,29 +234,26 @@ bool Pipeline::isSupported(const tipb::DAGRequest & dag_request)
traverseExecutors(
&dag_request,
[&](const tipb::Executor & executor) {
// TODO support fine grained shuffle.
if (FineGrainedShuffle(&executor).enable())
{
is_supported = false;
return false;
}
switch (executor.tp())
{
case tipb::ExecType::TypeProjection:
case tipb::ExecType::TypeSelection:
case tipb::ExecType::TypeLimit:
case tipb::ExecType::TypeTopN:
case tipb::ExecType::TypeAggregation:
case tipb::ExecType::TypeTableScan:
if (executor.tbl_scan().keep_order())
{
is_supported = false;
return false;
}
case tipb::ExecType::TypeProjection:
case tipb::ExecType::TypeSelection:
case tipb::ExecType::TypeLimit:
case tipb::ExecType::TypeTopN:
case tipb::ExecType::TypeExchangeSender:
case tipb::ExecType::TypeExchangeReceiver:
case tipb::ExecType::TypeExpand:
return true;
case tipb::ExecType::TypeAggregation:
// TODO support fine grained shuffle.
if (!FineGrainedShuffle(&executor).enable())
return true;
default:
is_supported = false;
return false;
Expand Down
Loading