Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change one additional_input_at_end to many streams in ParallelInputsProcessor #5274

Merged
merged 22 commits into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace DB
{
ParallelAggregatingBlockInputStream::ParallelAggregatingBlockInputStream(
const BlockInputStreams & inputs,
const BlockInputStreamPtr & additional_input_at_end,
const BlockInputStreams & additional_inputs_at_end,
const Aggregator::Params & params_,
const FileProviderPtr & file_provider_,
bool final_,
Expand All @@ -41,11 +41,10 @@ ParallelAggregatingBlockInputStream::ParallelAggregatingBlockInputStream(
, keys_size(params.keys_size)
, aggregates_size(params.aggregates_size)
, handler(*this)
, processor(inputs, additional_input_at_end, max_threads, handler, log)
, processor(inputs, additional_inputs_at_end, max_threads, handler, log)
{
children = inputs;
if (additional_input_at_end)
children.push_back(additional_input_at_end);
children.insert(children.end(), additional_inputs_at_end.begin(), additional_inputs_at_end.end());
}


Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
*/
ParallelAggregatingBlockInputStream(
const BlockInputStreams & inputs,
const BlockInputStreamPtr & additional_input_at_end,
const BlockInputStreams & additional_inputs_at_end,
const Aggregator::Params & params_,
const FileProviderPtr & file_provider_,
bool final_,
Expand Down
161 changes: 101 additions & 60 deletions dbms/src/DataStreams/ParallelInputsProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,21 @@ template <typename Handler, StreamUnionMode mode = StreamUnionMode::Basic>
class ParallelInputsProcessor
{
public:
/** additional_input_at_end - if not nullptr,
* then the blocks from this source will start to be processed only after all other sources are processed.
* This is done in the main thread.
/** additional_inputs_at_end - if not empty,
* then the blocks from thes sources will start to be processed only after all other sources are processed.
*
* Intended for implementation of FULL and RIGHT JOIN
* - where you must first make JOIN in parallel, while noting which keys are not found,
* and only after the completion of this work, create blocks of keys that are not found.
*/
ParallelInputsProcessor(
const BlockInputStreams & inputs_,
const BlockInputStreamPtr & additional_input_at_end_,
const BlockInputStreams & additional_inputs_at_end_,
size_t max_threads_,
Handler & handler_,
const LoggerPtr & log_)
: inputs(inputs_)
, additional_input_at_end(additional_input_at_end_)
, additional_inputs_at_end(additional_inputs_at_end_)
, max_threads(std::min(inputs_.size(), max_threads_))
, handler(handler_)
, log(log_)
Expand All @@ -125,6 +124,10 @@ class ParallelInputsProcessor
if (!thread_manager)
thread_manager = newThreadManager();
active_threads = max_threads;
{
std::lock_guard lock(running_first_mutex);
running_first = max_threads;
}
for (size_t i = 0; i < max_threads; ++i)
thread_manager->schedule(true, handler.getName(), [this, i] { this->thread(i); });
}
Expand All @@ -136,21 +139,12 @@ class ParallelInputsProcessor

for (auto & input : inputs)
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*input))
{
try
{
child->cancel(kill);
}
catch (...)
{
/** If you can not ask one or more sources to stop.
* (for example, the connection is broken for distributed query processing)
* - then do not care.
*/
LOG_FMT_ERROR(log, "Exception while cancelling {}", child->getName());
}
}
cancelStream(input, kill);
}

for (auto & input : additional_inputs_at_end)
{
cancelStream(input, kill);
}
}

Expand Down Expand Up @@ -188,6 +182,24 @@ class ParallelInputsProcessor
{}
};

void cancelStream(const BlockInputStreamPtr & stream, bool kill) {
if (auto * child = dynamic_cast<IProfilingBlockInputStream *>(&*stream))
{
try
{
child->cancel(kill);
}
catch (...)
{
/** If you can not ask one or more sources to stop.
* (for example, the connection is broken for distributed query processing)
* - then do not care.
*/
LOG_FMT_ERROR(log, "Exception while cancelling {}", child->getName());
}
}
}

void publishPayload(BlockInputStreamPtr & stream, Block & block, size_t thread_num)
{
if constexpr (mode == StreamUnionMode::Basic)
Expand All @@ -205,27 +217,53 @@ class ParallelInputsProcessor

try
{
while (!finish)
{
InputData unprepared_input;
{
std::lock_guard lock(unprepared_inputs_mutex);
loop(thread_num);
}
catch (...)
{
exception = std::current_exception();
}

if (unprepared_inputs.empty())
break;
if (exception)
{
handler.onException(exception, thread_num);
}

unprepared_input = unprepared_inputs.front();
unprepared_inputs.pop();
}
handler.onFinishThread(thread_num);

unprepared_input.in->readPrefix();
if (additional_inputs_at_end.empty())
{
if (0 == --active_threads)
{
handler.onFinish();
}
return;
}

{
std::lock_guard lock(available_inputs_mutex);
available_inputs.push(unprepared_input);
}
{
std::unique_lock lock(running_first_mutex);
if (0 == --running_first)
{
/// Only one thread can go here so don't need to hold `unprepared_inputs_mutex`
/// or `unprepared_inputs_mutex` lock.
/// If an error has occurred, the `unprepared_inputs` and `available_inputs` may not be empty.
unprepared_inputs = UnpreparedInputs{};
available_inputs = AvailableInputs{};
for (size_t i = 0; i < additional_inputs_at_end.size(); ++i)
unprepared_inputs.emplace(additional_inputs_at_end[i], i);

wait_first_done.notify_all();
}
else
{
wait_first_done.wait(lock, [this] {
return running_first == 0;
});
}
}

try
{
loop(thread_num);
}
catch (...)
Expand All @@ -238,37 +276,34 @@ class ParallelInputsProcessor
handler.onException(exception, thread_num);
}

handler.onFinishThread(thread_num);
if (0 == --active_threads) {
handler.onFinish(); /// TODO If in `onFinish` or `onFinishThread` there is an exception, then std::terminate is called.
}
}

/// The last thread on the output indicates that there is no more data.
if (0 == --active_threads)
void loop(size_t thread_num)
{
while (!finish)
{
/// And then it processes an additional source, if there is one.
if (additional_input_at_end)
InputData unprepared_input;
{
try
{
additional_input_at_end->readPrefix();
while (Block block = additional_input_at_end->read())
publishPayload(additional_input_at_end, block, thread_num);
}
catch (...)
{
exception = std::current_exception();
}
std::lock_guard lock(unprepared_inputs_mutex);

if (exception)
{
handler.onException(exception, thread_num);
}
if (unprepared_inputs.empty())
break;

unprepared_input = unprepared_inputs.front();
unprepared_inputs.pop();
}

handler.onFinish(); /// TODO If in `onFinish` or `onFinishThread` there is an exception, then std::terminate is called.
unprepared_input.in->readPrefix();

{
std::lock_guard lock(available_inputs_mutex);
available_inputs.push(unprepared_input);
}
}
}

void loop(size_t thread_num)
{
while (!finish) /// You may need to stop work earlier than all sources run out.
{
InputData input;
Expand Down Expand Up @@ -318,8 +353,8 @@ class ParallelInputsProcessor
}
}

BlockInputStreams inputs;
BlockInputStreamPtr additional_input_at_end;
const BlockInputStreams inputs;
const BlockInputStreams additional_inputs_at_end;
unsigned max_threads;

Handler & handler;
Expand Down Expand Up @@ -359,6 +394,12 @@ class ParallelInputsProcessor
/// For operations with unprepared_inputs.
std::mutex unprepared_inputs_mutex;

/// For waiting all `inputs` sources work done.
/// After that, the `additional_inputs_at_end` sources can be processed.
std::mutex running_first_mutex;
std::condition_variable wait_first_done;
size_t running_first{0};

/// How many sources ran out.
std::atomic<size_t> active_threads{0};
/// Finish the threads work (before the sources run out).
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/UnionBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream
: output_queue(std::min(inputs.size(), max_threads) * 5) // reduce contention
, log(Logger::get(NAME, req_id))
, handler(*this)
, processor(inputs, additional_input_at_end, max_threads, handler, log)
, processor(inputs, additional_input_at_end ? BlockInputStreams{additional_input_at_end} : BlockInputStreams{}, max_threads, handler, log)
, exception_callback(exception_callback_)
{
// TODO: assert capacity of output_queue is not less than processor.getMaxThreads()
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,17 +386,17 @@ void DAGQueryBlockInterpreter::executeAggregation(
if (pipeline.streams.size() > 1)
{
const Settings & settings = context.getSettingsRef();
BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log);
pipeline.firstStream() = std::make_shared<ParallelAggregatingBlockInputStream>(
pipeline.streams,
stream_with_non_joined_data,
pipeline.streams_with_non_joined_data,
params,
context.getFileProvider(),
true,
max_streams,
settings.aggregation_memory_efficient_merge_threads ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads) : static_cast<size_t>(settings.max_threads),
log->identifier());
pipeline.streams.resize(1);
pipeline.streams_with_non_joined_data.clear();
// should record for agg before restore concurrency. See #3804.
recordProfileStreams(pipeline, query_block.aggregation_name);
restorePipelineConcurrency(pipeline);
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -989,9 +989,15 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
{
BlockInputStreams additional_inputs;
if (pipeline.stream_with_non_joined_data)
{
additional_inputs.push_back(pipeline.stream_with_non_joined_data);
}

pipeline.firstStream() = std::make_shared<ParallelAggregatingBlockInputStream>(
pipeline.streams,
pipeline.stream_with_non_joined_data,
additional_inputs,
params,
file_provider,
final,
Expand Down