Skip to content

Commit

Permalink
Pipeline: support fine grained shuffle (#6934)
Browse files Browse the repository at this point in the history
ref #6518
  • Loading branch information
SeaRise authored Mar 20, 2023
1 parent 4c4e2c0 commit ac240b4
Show file tree
Hide file tree
Showing 67 changed files with 759 additions and 216 deletions.
38 changes: 36 additions & 2 deletions dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,43 @@ tipb::ExchangeType ExchangeSenderBinder::getType() const
return type;
}

ExecutorBinderPtr compileExchangeSender(ExecutorBinderPtr input, size_t & executor_index, tipb::ExchangeType exchange_type)
ExecutorBinderPtr compileExchangeSender(
ExecutorBinderPtr input,
size_t & executor_index,
tipb::ExchangeType exchange_type,
ASTPtr partition_key_list,
uint64_t fine_grained_shuffle_stream_count)
{
ExecutorBinderPtr exchange_sender = std::make_shared<mock::ExchangeSenderBinder>(executor_index, input->output_schema, exchange_type);
std::vector<size_t> partition_key_indexes;
for (const auto & partition_key : partition_key_list->children)
{
size_t schema_index = 0;
for (; schema_index < input->output_schema.size(); ++schema_index)
{
if (input->output_schema[schema_index].first == partition_key->getColumnName())
{
partition_key_indexes.push_back(schema_index);
break;
}
}
auto schema_string = [&]() {
FmtBuffer buffer;
buffer.joinStr(
input->output_schema.cbegin(),
input->output_schema.cend(),
[](const auto & item, FmtBuffer & buf) { buf.append(item.first); },
", ");
return buffer.toString();
};
if (schema_index == input->output_schema.size())
throw Exception(fmt::format("Unknown partition key: {}, schema is [{}]", partition_key->getColumnName(), schema_string()));
}
ExecutorBinderPtr exchange_sender = std::make_shared<mock::ExchangeSenderBinder>(
executor_index,
input->output_schema,
exchange_type,
partition_key_indexes,
fine_grained_shuffle_stream_count);
exchange_sender->children.push_back(input);
return exchange_sender;
}
Expand Down
14 changes: 12 additions & 2 deletions dbms/src/Debug/MockExecutor/ExchangeSenderBinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ namespace DB::mock
class ExchangeSenderBinder : public ExecutorBinder
{
public:
ExchangeSenderBinder(size_t & index, const DAGSchema & output, tipb::ExchangeType type_, const std::vector<size_t> & partition_keys_ = {}, uint64_t fine_grained_shuffle_stream_count_ = 0)
ExchangeSenderBinder(
size_t & index,
const DAGSchema & output,
tipb::ExchangeType type_,
const std::vector<size_t> & partition_keys_ = {},
uint64_t fine_grained_shuffle_stream_count_ = 0)
: ExecutorBinder(index, "exchange_sender_" + std::to_string(index), output)
, type(type_)
, partition_keys(partition_keys_)
Expand All @@ -42,5 +47,10 @@ class ExchangeSenderBinder : public ExecutorBinder
uint64_t fine_grained_shuffle_stream_count;
};

ExecutorBinderPtr compileExchangeSender(ExecutorBinderPtr input, size_t & executor_index, tipb::ExchangeType exchange_type);
ExecutorBinderPtr compileExchangeSender(
ExecutorBinderPtr input,
size_t & executor_index,
tipb::ExchangeType exchange_type,
ASTPtr partition_key_list = {},
uint64_t fine_grained_shuffle_stream_count = 0);
} // namespace DB::mock
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline)
{
extra_info = String(enableFineGrainedShuffleExtraInfo);
RUNTIME_CHECK(exchange_sender.tp() == tipb::ExchangeType::Hash, ExchangeType_Name(exchange_sender.tp()));
RUNTIME_CHECK(stream_count <= 1024, stream_count);
RUNTIME_CHECK(stream_count <= maxFineGrainedStreamCount, stream_count);
}
pipeline.transform([&](auto & stream) {
// construct writer
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Coprocessor/FineGrainedShuffle.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,20 @@ namespace DB
{
static constexpr std::string_view enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle";

static constexpr size_t maxFineGrainedStreamCount = 1024;

inline bool enableFineGrainedShuffle(uint64_t stream_count)
{
return stream_count > 0;
}

struct FineGrainedShuffle
{
FineGrainedShuffle()
: stream_count(0)
, batch_size(0)
{}

explicit FineGrainedShuffle(const tipb::Executor * executor)
: stream_count(executor ? executor->fine_grained_shuffle_stream_count() : 0)
, batch_size(executor ? executor->fine_grained_shuffle_batch_size() : 0)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Exec/PipelineExec.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace DB
// The executor for push model operator.
// A pipeline will generate multiple pipeline_execs.
// data flow: source --> transform --> .. --> transform --> sink
class PipelineExec
class PipelineExec : private boost::noncopyable
{
public:
PipelineExec(
Expand Down
8 changes: 0 additions & 8 deletions dbms/src/Flash/Pipeline/Exec/PipelineExecBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

namespace DB
{
class PipelineExecutorStatus;

struct PipelineExecBuilder
{
SourceOpPtr source_op;
Expand All @@ -41,12 +39,6 @@ struct PipelineExecGroupBuilder
using BuilderGroup = std::vector<PipelineExecBuilder>;
BuilderGroup group;

explicit PipelineExecGroupBuilder(PipelineExecutorStatus & exec_status_)
: exec_status(exec_status_)
{}

PipelineExecutorStatus & exec_status;

size_t concurrency = 0;

void init(size_t init_concurrency);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Pipeline/Exec/tests/gtest_simple_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ class SimpleOperatorTestRunner : public DB::tests::ExecutorTest
PhysicalPlan physical_plan{*context.context, ""};
physical_plan.build(request.get());
assert(!result_handler.isIgnored());
auto plan_tree = PhysicalGetResultSink::build(std::move(result_handler), physical_plan.outputAndOptimize());
auto plan_tree = PhysicalGetResultSink::build(std::move(result_handler), Logger::get(), physical_plan.outputAndOptimize());

PipelineExecGroupBuilder group_builder{exec_status};
PipelineExecGroupBuilder group_builder;
PhysicalPlanVisitor::visitPostOrder(plan_tree, [&](const PhysicalPlanNodePtr & plan) {
assert(plan);
plan->buildPipelineExec(group_builder, *context.context, /*concurrency=*/1);
plan->buildPipelineExecGroup(exec_status, group_builder, *context.context, /*concurrency=*/1);
});
auto result = group_builder.build();
assert(result.size() == 1);
Expand Down
173 changes: 138 additions & 35 deletions dbms/src/Flash/Pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Flash/Pipeline/Pipeline.h>
#include <Flash/Pipeline/Schedule/Events/Event.h>
#include <Flash/Pipeline/Schedule/Events/FineGrainedPipelineEvent.h>
#include <Flash/Pipeline/Schedule/Events/PlainPipelineEvent.h>
#include <Flash/Planner/PhysicalPlanNode.h>
#include <Flash/Planner/Plans/PhysicalGetResultSink.h>
Expand All @@ -34,6 +35,85 @@ FmtBuffer & addPrefix(FmtBuffer & buffer, size_t level)
}
} // namespace

PipelineEvents::PipelineEvents(Events && events_, bool is_fine_grained_)
: events(std::move(events_))
, is_fine_grained(is_fine_grained_)
{
RUNTIME_CHECK(!events.empty());
// For non fine grained mode, the size of events must be 1.
RUNTIME_CHECK(is_fine_grained || events.size() == 1);
}

void PipelineEvents::mapInputs(const PipelineEvents & inputs)
{
/// The self events is output.
if (inputs.is_fine_grained && is_fine_grained)
{
if (inputs.events.size() == events.size())
{
/**
* 1. If the number of partitions match, use fine grained mapping here.
* ```
* FineGrainedPipelineEvent◄────FineGrainedPipelineEvent
* FineGrainedPipelineEvent◄────FineGrainedPipelineEvent
* FineGrainedPipelineEvent◄────FineGrainedPipelineEvent
* FineGrainedPipelineEvent◄────FineGrainedPipelineEvent
* ```
*/
size_t partition_num = inputs.events.size();
for (size_t index = 0; index < partition_num; ++index)
events[index]->addInput(inputs.events[index]);
}
else
{
/**
* 2. If the number of partitions does not match, it is safer to use full mapping.
* ```
* FineGrainedPipelineEvent◄──┐ ┌──FineGrainedPipelineEvent
* FineGrainedPipelineEvent◄──┼─┼──FineGrainedPipelineEvent
* FineGrainedPipelineEvent◄──┤ └──FineGrainedPipelineEvent
* FineGrainedPipelineEvent◄──┘
* ```
*/
for (const auto & output : events)
{
for (const auto & input : inputs.events)
output->addInput(input);
}
}
}
else
{
/**
* Use full mapping here.
* 1. for non fine grained inputs and non fine grained outputs
* The size of inputs and outputs must be the same and 1.
* ```
* PlainPipelineEvent◄────PlainPipelineEvent
* ```
* 2. for non fine grained inputs and fine grained outputs
* This is not possible, if fine-grained is enabled in outputs, then inputs must also be enabled.
* 3. for fine grained inputs and non fine grained outputs
* ```
* ┌──FineGrainedPipelineEvent
* PlainPipelineEvent◄──┼──FineGrainedPipelineEvent
* ├──FineGrainedPipelineEvent
* └──FineGrainedPipelineEvent
*
* PlainPipelineEvent◄────FineGrainedPipelineEvent
* ```
*/

// If the outputs is fine grained model, the intputs must also be.
RUNTIME_CHECK(inputs.is_fine_grained || !is_fine_grained);
for (const auto & output : events)
{
for (const auto & input : inputs.events)
output->addInput(input);
}
}
}

void Pipeline::addPlanNode(const PhysicalPlanNodePtr & plan_node)
{
plan_nodes.push_back(plan_node);
Expand Down Expand Up @@ -76,50 +156,76 @@ void Pipeline::toTreeString(FmtBuffer & buffer, size_t level) const
void Pipeline::addGetResultSink(ResultHandler && result_handler)
{
assert(!plan_nodes.empty());
auto get_result_sink = PhysicalGetResultSink::build(std::move(result_handler), plan_nodes.back());
auto get_result_sink = PhysicalGetResultSink::build(std::move(result_handler), log, plan_nodes.back());
addPlanNode(get_result_sink);
}

PipelineExecGroup Pipeline::buildExecGroup(PipelineExecutorStatus & exec_status, Context & context, size_t concurrency)
{
assert(!plan_nodes.empty());
PipelineExecGroupBuilder builder{exec_status};
PipelineExecGroupBuilder builder;
for (const auto & plan_node : plan_nodes)
plan_node->buildPipelineExec(builder, context, concurrency);
plan_node->buildPipelineExecGroup(exec_status, builder, context, concurrency);
return builder.build();
}

/**
* There are two execution modes in pipeline.
* 1. non fine grained mode
* A pipeline generates an event(PlainPipelineEvent).
* This means that all the operators in the pipeline are finished before the next pipeline is triggered.
* 2. fine grained mode
* A pipeline will generate n Events(FineGrainedPipelineEvent), one for each data partition.
* There is a fine-grained mapping of Events between Pipelines, e.g. only Events from the same data partition will have dependencies on each other.
* This means that once some data partition of the previous pipeline has finished, the operators of the next pipeline's corresponding data partition can be started without having to wait for the entire pipeline to finish.
*
* ┌──non fine grained mode──┐ ┌──fine grained mode──┐
* ┌──FineGrainedPipelineEvent◄───FineGrainedPipelineEvent
* PlainPipelineEvent◄───PlainPipelineEvent◄──┼──FineGrainedPipelineEvent◄───FineGrainedPipelineEvent
* └──FineGrainedPipelineEvent◄───FineGrainedPipelineEvent
*/
bool Pipeline::isFineGrainedMode() const
{
assert(!plan_nodes.empty());
// The source plan node determines whether the execution mode is fine grained or non-fine grained.
return plan_nodes.front()->getFineGrainedShuffle().enable();
}

Events Pipeline::toEvents(PipelineExecutorStatus & status, Context & context, size_t concurrency)
{
Events all_events;
toEvent(status, context, concurrency, all_events);
doToEvents(status, context, concurrency, all_events);
assert(!all_events.empty());
return all_events;
}

EventPtr Pipeline::toEvent(PipelineExecutorStatus & status, Context & context, size_t concurrency, Events & all_events)
{
// TODO support fine grained shuffle
// - a fine grained partition maps to an event
// - the event flow will be
// ```
// disable fine grained partition pipeline enable fine grained partition pipeline enable fine grained partition pipeline
// ┌───────────────FineGrainedPipelineEvent<────────────────FineGrainedPipelineEvent
// PlainPipelineEvent<────────┼───────────────FineGrainedPipelineEvent<────────────────FineGrainedPipelineEvent
// ├───────────────FineGrainedPipelineEvent<────────────────FineGrainedPipelineEvent
// └───────────────FineGrainedPipelineEvent<────────────────FineGrainedPipelineEvent
// ```
PipelineEvents Pipeline::toSelfEvents(PipelineExecutorStatus & status, Context & context, size_t concurrency)
{
auto memory_tracker = current_memory_tracker ? current_memory_tracker->shared_from_this() : nullptr;

auto plain_pipeline_event = std::make_shared<PlainPipelineEvent>(status, memory_tracker, log->identifier(), context, shared_from_this(), concurrency);
for (const auto & child : children)
Events self_events;
assert(!plan_nodes.empty());
if (isFineGrainedMode())
{
auto input = child->toEvent(status, context, concurrency, all_events);
assert(input);
plain_pipeline_event->addInput(input);
auto fine_grained_exec_group = buildExecGroup(status, context, concurrency);
for (auto & pipeline_exec : fine_grained_exec_group)
self_events.push_back(std::make_shared<FineGrainedPipelineEvent>(status, memory_tracker, log->identifier(), std::move(pipeline_exec)));
LOG_DEBUG(log, "Execute in fine grained model and generate {} fine grained pipeline event", self_events.size());
}
all_events.push_back(plain_pipeline_event);
return plain_pipeline_event;
else
{
self_events.push_back(std::make_shared<PlainPipelineEvent>(status, memory_tracker, log->identifier(), context, shared_from_this(), concurrency));
LOG_DEBUG(log, "Execute in non fine grained model and generate one plain pipeline event");
}
return {std::move(self_events), isFineGrainedMode()};
}

PipelineEvents Pipeline::doToEvents(PipelineExecutorStatus & status, Context & context, size_t concurrency, Events & all_events)
{
auto self_events = toSelfEvents(status, context, concurrency);
for (const auto & child : children)
self_events.mapInputs(child->doToEvents(status, context, concurrency, all_events));
all_events.insert(all_events.end(), self_events.events.cbegin(), self_events.events.cend());
return self_events;
}

bool Pipeline::isSupported(const tipb::DAGRequest & dag_request)
Expand All @@ -128,29 +234,26 @@ bool Pipeline::isSupported(const tipb::DAGRequest & dag_request)
traverseExecutors(
&dag_request,
[&](const tipb::Executor & executor) {
// TODO support fine grained shuffle.
if (FineGrainedShuffle(&executor).enable())
{
is_supported = false;
return false;
}
switch (executor.tp())
{
case tipb::ExecType::TypeProjection:
case tipb::ExecType::TypeSelection:
case tipb::ExecType::TypeLimit:
case tipb::ExecType::TypeTopN:
case tipb::ExecType::TypeAggregation:
case tipb::ExecType::TypeTableScan:
if (executor.tbl_scan().keep_order())
{
is_supported = false;
return false;
}
case tipb::ExecType::TypeProjection:
case tipb::ExecType::TypeSelection:
case tipb::ExecType::TypeLimit:
case tipb::ExecType::TypeTopN:
case tipb::ExecType::TypeExchangeSender:
case tipb::ExecType::TypeExchangeReceiver:
case tipb::ExecType::TypeExpand:
return true;
case tipb::ExecType::TypeAggregation:
// TODO support fine grained shuffle.
if (!FineGrainedShuffle(&executor).enable())
return true;
default:
is_supported = false;
return false;
Expand Down
Loading

0 comments on commit ac240b4

Please sign in to comment.