Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
f
  • Loading branch information
SeaRise committed Jul 11, 2022
1 parent efedcee commit 1b18ac4
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 17 deletions.
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ extern const int DIVIDED_BY_ZERO;
extern const int INVALID_TIME;
} // namespace ErrorCodes

const String enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle";

bool strictSqlMode(UInt64 sql_mode)
{
return sql_mode & TiDBSQLMode::STRICT_ALL_TABLES || sql_mode & TiDBSQLMode::STRICT_TRANS_TABLES;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ inline bool enableFineGrainedShuffle(uint64_t stream_count)
return stream_count > 0;
}

extern const String enableFineGrainedShuffleExtraInfo;

/// A context used to track the information that needs to be passed around during DAG planning.
class DAGContext
{
Expand Down
8 changes: 2 additions & 6 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ namespace FailPoints
{
extern const char minimum_block_size_for_cross_join[];
} // namespace FailPoints
namespace
{
const String enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle";
}

DAGQueryBlockInterpreter::DAGQueryBlockInterpreter(
Context & context_,
Expand Down Expand Up @@ -438,15 +434,15 @@ void DAGQueryBlockInterpreter::executeAggregation(
}
}

void DAGQueryBlockInterpreter::executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc)
void DAGQueryBlockInterpreter::executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc, bool enable_fine_grained_shuffle)
{
orderStreams(pipeline, max_streams, sort_desc, 0, enable_fine_grained_shuffle, context, log);
}

void DAGQueryBlockInterpreter::executeOrder(DAGPipeline & pipeline, const NamesAndTypes & order_columns)
{
Int64 limit = query_block.limit_or_topn->topn().limit();
orderStreams(pipeline, max_streams, getSortDescription(order_columns, query_block.limit_or_topn->topn().order_by()), limit, enable_fine_grained_shuffle, context, log);
orderStreams(pipeline, max_streams, getSortDescription(order_columns, query_block.limit_or_topn->topn().order_by()), limit, false, context, log);
}

void DAGQueryBlockInterpreter::recordProfileStreams(DAGPipeline & pipeline, const String & key)
Expand Down
45 changes: 34 additions & 11 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <DataStreams/PartialSortingBlockInputStream.h>
#include <DataStreams/SharedQueryBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Interpreters/Context.h>

Expand Down Expand Up @@ -112,10 +113,14 @@ void orderStreams(
size_t max_streams,
SortDescription order_descr,
Int64 limit,
bool enable_fine_grained_shuffle,
const Context & context,
const LoggerPtr & log)
{
const Settings & settings = context.getSettingsRef();
String extra_info;
if (enable_fine_grained_shuffle)
extra_info = enableFineGrainedShuffleExtraInfo;

pipeline.transform([&](auto & stream) {
auto sorting_stream = std::make_shared<PartialSortingBlockInputStream>(stream, order_descr, log->identifier(), limit);
Expand All @@ -127,19 +132,37 @@ void orderStreams(
sorting_stream->setLimits(limits);

stream = sorting_stream;
stream->setExtraInfo(extra_info);
});

/// If there are several streams, we merge them into one
executeUnion(pipeline, max_streams, log, false, "for partial order");
if (enable_fine_grained_shuffle)
{
pipeline.transform([&](auto & stream) {
stream = std::make_shared<MergeSortingBlockInputStream>(
stream,
order_descr,
settings.max_block_size,
limit,
settings.max_bytes_before_external_sort,
context.getTemporaryPath(),
log->identifier());
stream->setExtraInfo(enableFineGrainedShuffleExtraInfo);
});
}
else
{
/// If there are several streams, we merge them into one
executeUnion(pipeline, max_streams, log, false, "for partial order");

/// Merge the sorted blocks.
pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
pipeline.firstStream(),
order_descr,
settings.max_block_size,
limit,
settings.max_bytes_before_external_sort,
context.getTemporaryPath(),
log->identifier());
/// Merge the sorted blocks.
pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
pipeline.firstStream(),
order_descr,
settings.max_block_size,
limit,
settings.max_bytes_before_external_sort,
context.getTemporaryPath(),
log->identifier());
}
}
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ void orderStreams(
size_t max_streams,
SortDescription order_descr,
Int64 limit,
bool enable_fine_grained_shuffle,
const Context & context,
const LoggerPtr & log);
} // namespace DB

0 comments on commit 1b18ac4

Please sign in to comment.