From 1b18ac45e23703a7ab9db1e881005ac7db348ca7 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 11 Jul 2022 13:36:31 +0800 Subject: [PATCH] fix f --- dbms/src/Flash/Coprocessor/DAGContext.cpp | 2 + dbms/src/Flash/Coprocessor/DAGContext.h | 2 + .../Coprocessor/DAGQueryBlockInterpreter.cpp | 8 +--- .../Flash/Coprocessor/InterpreterUtils.cpp | 45 ++++++++++++++----- dbms/src/Flash/Coprocessor/InterpreterUtils.h | 1 + 5 files changed, 41 insertions(+), 17 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index 7877ea932eb..1cf7a0d6c87 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -30,6 +30,8 @@ extern const int DIVIDED_BY_ZERO; extern const int INVALID_TIME; } // namespace ErrorCodes +const String enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle"; + bool strictSqlMode(UInt64 sql_mode) { return sql_mode & TiDBSQLMode::STRICT_ALL_TABLES || sql_mode & TiDBSQLMode::STRICT_TRANS_TABLES; diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 9e91161b669..7bfc67afcad 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -121,6 +121,8 @@ inline bool enableFineGrainedShuffle(uint64_t stream_count) return stream_count > 0; } +extern const String enableFineGrainedShuffleExtraInfo; + /// A context used to track the information that needs to be passed around during DAG planning. class DAGContext { diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index aae4d1bb85c..764bf07f533 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -56,10 +56,6 @@ namespace FailPoints { extern const char minimum_block_size_for_cross_join[]; } // namespace FailPoints -namespace -{ -const String enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle"; -} DAGQueryBlockInterpreter::DAGQueryBlockInterpreter( Context & context_, @@ -438,7 +434,7 @@ void DAGQueryBlockInterpreter::executeAggregation( } } -void DAGQueryBlockInterpreter::executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc) +void DAGQueryBlockInterpreter::executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc, bool enable_fine_grained_shuffle) { orderStreams(pipeline, max_streams, sort_desc, 0, enable_fine_grained_shuffle, context, log); } @@ -446,7 +442,7 @@ void DAGQueryBlockInterpreter::executeWindowOrder(DAGPipeline & pipeline, SortDe void DAGQueryBlockInterpreter::executeOrder(DAGPipeline & pipeline, const NamesAndTypes & order_columns) { Int64 limit = query_block.limit_or_topn->topn().limit(); - orderStreams(pipeline, max_streams, getSortDescription(order_columns, query_block.limit_or_topn->topn().order_by()), limit, enable_fine_grained_shuffle, context, log); + orderStreams(pipeline, max_streams, getSortDescription(order_columns, query_block.limit_or_topn->topn().order_by()), limit, false, context, log); } void DAGQueryBlockInterpreter::recordProfileStreams(DAGPipeline & pipeline, const String & key) diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index c747823b69d..002a06d07b9 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -112,10 +113,14 @@ void orderStreams( size_t max_streams, SortDescription order_descr, Int64 limit, + bool enable_fine_grained_shuffle, const Context & context, const LoggerPtr & log) { const Settings & settings = context.getSettingsRef(); + String extra_info; + if (enable_fine_grained_shuffle) + extra_info = enableFineGrainedShuffleExtraInfo; pipeline.transform([&](auto & stream) { auto sorting_stream = std::make_shared(stream, order_descr, log->identifier(), limit); @@ -127,19 +132,37 @@ void orderStreams( sorting_stream->setLimits(limits); stream = sorting_stream; + stream->setExtraInfo(extra_info); }); - /// If there are several streams, we merge them into one - executeUnion(pipeline, max_streams, log, false, "for partial order"); + if (enable_fine_grained_shuffle) + { + pipeline.transform([&](auto & stream) { + stream = std::make_shared( + stream, + order_descr, + settings.max_block_size, + limit, + settings.max_bytes_before_external_sort, + context.getTemporaryPath(), + log->identifier()); + stream->setExtraInfo(enableFineGrainedShuffleExtraInfo); + }); + } + else + { + /// If there are several streams, we merge them into one + executeUnion(pipeline, max_streams, log, false, "for partial order"); - /// Merge the sorted blocks. - pipeline.firstStream() = std::make_shared( - pipeline.firstStream(), - order_descr, - settings.max_block_size, - limit, - settings.max_bytes_before_external_sort, - context.getTemporaryPath(), - log->identifier()); + /// Merge the sorted blocks. + pipeline.firstStream() = std::make_shared( + pipeline.firstStream(), + order_descr, + settings.max_block_size, + limit, + settings.max_bytes_before_external_sort, + context.getTemporaryPath(), + log->identifier()); + } } } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.h b/dbms/src/Flash/Coprocessor/InterpreterUtils.h index 36280f3b903..bd64346718c 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.h +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.h @@ -57,6 +57,7 @@ void orderStreams( size_t max_streams, SortDescription order_descr, Int64 limit, + bool enable_fine_grained_shuffle, const Context & context, const LoggerPtr & log); } // namespace DB