Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: use notify instead of polling for ResultQueue #9065

Merged
merged 70 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 64 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
7036d82
for task
SeaRise Mar 26, 2024
c0321fb
operator
SeaRise Mar 26, 2024
11b364f
for profile
SeaRise Mar 26, 2024
ff3fea4
fix build
SeaRise Mar 26, 2024
9511939
fix notify
SeaRise Mar 26, 2024
c7084d5
minor refactor
SeaRise Mar 26, 2024
2512249
__thread
SeaRise Mar 26, 2024
9914a58
tmp save
SeaRise Mar 26, 2024
71275bf
tmp
SeaRise Mar 26, 2024
afff674
update
SeaRise Mar 26, 2024
2b2b029
fix
SeaRise Mar 26, 2024
c701633
u
SeaRise Mar 26, 2024
ca9d6f3
refactor
SeaRise Mar 26, 2024
f93842e
fiux
SeaRise Mar 26, 2024
b453caa
Merge branch 'master' into notify_instead_of_pollinh
SeaRise Mar 26, 2024
4b6c363
final
SeaRise Mar 26, 2024
f0c6bfd
final final
SeaRise Mar 26, 2024
a05bd14
refine for lock
SeaRise Mar 27, 2024
0ba130f
address comment
SeaRise Mar 27, 2024
20d297d
Update dbms/src/Storages/DeltaMerge/ReadThread/WorkQueue.h
SeaRise Mar 27, 2024
28b1a58
update
SeaRise Mar 28, 2024
2b15547
update
SeaRise Mar 29, 2024
acafb58
Merge branch 'master' into shared_queue_notify
SeaRise Mar 29, 2024
00543bf
u
SeaRise Mar 29, 2024
ba964ab
update
SeaRise Mar 29, 2024
6c33a4a
Merge branch 'master' into shared_queue_notify
SeaRise Mar 29, 2024
9e1270d
Update dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h
SeaRise Mar 29, 2024
ace773f
fix
SeaRise Mar 29, 2024
8234f45
u
SeaRise Mar 29, 2024
30208c3
fix
SeaRise Apr 1, 2024
a0bffb6
add test case
SeaRise Apr 1, 2024
5b2610b
Update dbms/src/Operators/GetResultSinkOp.cpp
SeaRise Apr 1, 2024
9af4fe3
Update dbms/src/Flash/Executor/PipelineExecutorContext.h
SeaRise Apr 1, 2024
1e63a3b
Update dbms/src/Flash/Executor/PipelineExecutorContext.cpp
SeaRise Apr 1, 2024
72a4c5f
refine
SeaRise Apr 1, 2024
122bcf5
u
SeaRise Apr 1, 2024
05eae12
some refine
SeaRise Apr 1, 2024
81d7936
update
SeaRise Apr 1, 2024
e129791
update
SeaRise Apr 1, 2024
430cce5
Merge branch 'master' into shared_queue_notify
SeaRise Apr 7, 2024
8f57d6f
refine
SeaRise Apr 7, 2024
9747de4
fix
SeaRise Apr 7, 2024
0061987
Merge branch 'master' into shared_queue_notify
SeaRise May 6, 2024
306af86
fix
SeaRise May 6, 2024
41e6d57
Merge branch 'master' into shared_queue_notify
SeaRise May 8, 2024
9049a5b
Update dbms/src/Operators/Operator.cpp
SeaRise May 8, 2024
08ba3a1
Update dbms/src/Operators/Operator.cpp
SeaRise May 8, 2024
3d717c4
Update dbms/src/Operators/Operator.cpp
SeaRise May 8, 2024
1d9563c
Update LooseBoundedMPMCQueue.h
SeaRise May 10, 2024
e533eee
Merge branch 'master' into shared_queue_notify
SeaRise May 20, 2024
db76363
refine result queue used in join restore
SeaRise May 20, 2024
92caf68
minor fix
SeaRise May 20, 2024
172b76b
Merge branch 'master' into refine_result_queue
SeaRise May 20, 2024
2b7b12b
u
SeaRise May 20, 2024
fbd2bb0
fmt
SeaRise May 20, 2024
55b469f
refine for result queue
SeaRise May 20, 2024
c2a39d2
u
SeaRise May 20, 2024
cbbd91b
fix
SeaRise May 20, 2024
ee5f75c
u
SeaRise May 20, 2024
39aa87d
u
SeaRise May 20, 2024
e2feba7
minor fix
SeaRise May 20, 2024
ec5d595
u
SeaRise May 21, 2024
357e7c4
u
SeaRise May 21, 2024
b0b9401
minor update
SeaRise May 21, 2024
8fb9698
Update dbms/src/Flash/Executor/ResultQueue_fwd.h
SeaRise May 22, 2024
bc7e151
Update dbms/src/Operators/GetResultSinkOp.cpp
SeaRise May 22, 2024
d48d24b
Update dbms/src/Flash/Pipeline/Schedule/Tasks/StreamRestoreTask.cpp
SeaRise May 22, 2024
eae3bbe
Merge branch 'master' into refine_result_queue
SeaRise May 23, 2024
6467e60
merge master
SeaRise May 30, 2024
5c94523
Merge branch 'master' into refine_result_queue
ti-chi-bot[bot] Jun 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions dbms/src/Flash/Executor/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,7 @@ void PipelineExecutor::wait()
void PipelineExecutor::consume(ResultHandler & result_handler)
{
assert(result_handler);
if (unlikely(context.isTest()))
{
// In test mode, a single query should take no more than 5 minutes to execute.
static std::chrono::minutes timeout(5);
exec_context.consumeFor(result_handler, timeout);
}
else
{
exec_context.consume(result_handler);
}
exec_context.consume(result_handler);
}

ExecutionResult PipelineExecutor::execute(ResultHandler && result_handler)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Executor/PipelineExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include <Flash/Executor/PipelineExecutorContext.h>
#include <Flash/Executor/QueryExecutor.h>
#include <Flash/Executor/ResultQueue.h>
#include <Flash/Executor/ResultQueue_fwd.h>

namespace DB
{
Expand Down
16 changes: 16 additions & 0 deletions dbms/src/Flash/Executor/PipelineExecutorContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Flash/Executor/PipelineExecutorContext.h>
#include <Flash/Executor/ResultQueue.h>
#include <Flash/Pipeline/Schedule/TaskScheduler.h>
#include <Operators/SharedQueue.h>

Expand Down Expand Up @@ -154,6 +155,7 @@ void PipelineExecutorContext::cancel()
if (is_cancelled.compare_exchange_strong(origin_value, true, std::memory_order_release))
{
cancelSharedQueues();
cancelResultQueueIfNeed();
if likely (TaskScheduler::instance && !query_id.empty())
TaskScheduler::instance->cancel(query_id, resource_group_name);
}
Expand All @@ -163,6 +165,7 @@ ResultQueuePtr PipelineExecutorContext::toConsumeMode(size_t queue_size)
{
std::lock_guard lock(mu);
RUNTIME_ASSERT(!result_queue.has_value());
RUNTIME_CHECK_MSG(!isCancelled(), "query has been cancelled.");
result_queue.emplace(std::make_shared<ResultQueue>(queue_size));
return *result_queue;
}
Expand All @@ -185,4 +188,17 @@ void PipelineExecutorContext::cancelSharedQueues()
for (const auto & shared_queue : tmp)
shared_queue->cancel();
}

void PipelineExecutorContext::cancelResultQueueIfNeed()
{
ResultQueue * tmp{nullptr};
{
std::lock_guard lock(mu);
if (isWaitMode())
return;
tmp = (*result_queue).get();
}
assert(tmp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why tmp is always not nullptr here?

Copy link
Contributor Author

@SeaRise SeaRise May 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because if isWaitMode() is false, the result_queue must not be nullptr

tmp->cancel();
}
} // namespace DB
51 changes: 2 additions & 49 deletions dbms/src/Flash/Executor/PipelineExecutorContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include <Core/AutoSpillTrigger.h>
#include <Flash/Executor/ExecutionResult.h>
#include <Flash/Executor/ResultHandler.h>
#include <Flash/Executor/ResultQueue.h>
#include <Flash/Executor/ResultQueue_fwd.h>
#include <Flash/Pipeline/Schedule/Tasks/TaskProfileInfo.h>

#include <atomic>
Expand Down Expand Up @@ -96,54 +96,6 @@ class PipelineExecutorContext : private boost::noncopyable

void consume(ResultHandler & result_handler);

template <typename Duration>
void consumeFor(ResultHandler & result_handler, const Duration & timeout_duration)
{
RUNTIME_ASSERT(result_handler);
auto consumed_result_queue = getConsumedResultQueue();
bool is_timeout = false;
try
{
Block ret;
while (true)
{
auto res = consumed_result_queue->popTimeout(ret, timeout_duration);
if (res == MPMCQueueResult::TIMEOUT)
{
is_timeout = true;
break;
}
else if (res == MPMCQueueResult::OK)
{
result_handler(ret);
}
else
{
break;
}
}
}
catch (...)
{
// If result_handler throws an error, here should notify the query to terminate, and wait for the end of the query.
onErrorOccurred(std::current_exception());
}
if (is_timeout)
{
LOG_WARNING(log, "wait timeout");
onErrorOccurred(timeout_err_msg);
throw Exception(timeout_err_msg);
}
else
{
// In order to ensure that `decActiveRefCount` has finished calling at this point
// and avoid referencing the already destructed `mu` in `decActiveRefCount`.
std::unique_lock lock(mu);
cv.wait(lock, [&] { return 0 == active_ref_count; });
}
LOG_DEBUG(log, "query finished and consume done");
}

void cancel();

ALWAYS_INLINE bool isCancelled() const { return is_cancelled.load(std::memory_order_acquire); }
Expand Down Expand Up @@ -188,6 +140,7 @@ class PipelineExecutorContext : private boost::noncopyable
ResultQueuePtr getConsumedResultQueue();

void cancelSharedQueues();
void cancelResultQueueIfNeed();

private:
const String query_id;
Expand Down
29 changes: 25 additions & 4 deletions dbms/src/Flash/Executor/ResultQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,35 @@

#pragma once

#include <Common/MPMCQueue.h>
#include <Common/PtrHolder.h>
#include <Common/LooseBoundedMPMCQueue.h>
#include <Core/Block.h>
#include <Flash/Pipeline/Schedule/Tasks/NotifyFuture.h>

#include <mutex>
#include <memory>

namespace DB
{
using ResultQueue = MPMCQueue<Block>;
class ResultQueue : public NotifyFuture
{
public:
explicit ResultQueue(size_t queue_size)
: queue(queue_size)
{}

// read
MPMCQueueResult pop(Block & block) { return queue.pop(block); }

// write
MPMCQueueResult push(Block && block) { return queue.push(block); }
MPMCQueueResult tryPush(Block && block) { return queue.tryPush(block); }
void registerTask(TaskPtr && task) override { queue.registerPipeWriteTask(std::move(task)); }

// finish/cancel
bool finish() { return queue.finish(); }
bool cancel() { return queue.cancel(); }

private:
LooseBoundedMPMCQueue<Block> queue;
};
using ResultQueuePtr = std::shared_ptr<ResultQueue>;
} // namespace DB
23 changes: 23 additions & 0 deletions dbms/src/Flash/Executor/ResultQueue_fwd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2023 PingCAP, Inc.
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>

namespace DB
{
class ResultQueue;
using ResultQueuePtr = std::shared_ptr<ResultQueue>;
} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/Exception.h>
#include <Common/ThreadManager.h>
#include <Flash/Executor/PipelineExecutorContext.h>
#include <Flash/Executor/ResultQueue.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <gtest/gtest.h>

Expand Down Expand Up @@ -44,29 +45,6 @@ try
}
CATCH

TEST_F(PipelineExecutorContextTestRunner, popTimeout)
try
{
PipelineExecutorContext context;
context.toConsumeMode(1);
try
{
context.incActiveRefCount();
std::chrono::milliseconds timeout(10);
ResultHandler result_handler{[](const Block &) {
}};
context.consumeFor(result_handler, timeout);
GTEST_FAIL();
}
catch (DB::Exception & e)
{
GTEST_ASSERT_EQ(e.message(), PipelineExecutorContext::timeout_err_msg);
auto err_msg = context.getExceptionMsg();
ASSERT_EQ(err_msg, PipelineExecutorContext::timeout_err_msg);
}
}
CATCH

TEST_F(PipelineExecutorContextTestRunner, run)
try
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#pragma once

#include <Flash/Executor/ResultHandler.h>
#include <Flash/Executor/ResultQueue.h>
#include <Flash/Executor/ResultQueue_fwd.h>
#include <Flash/Pipeline/Exec/PipelineExec.h>

#include <deque>
Expand Down
10 changes: 1 addition & 9 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,7 @@ class PipeConditionVariable
assert(task);
task->notify();
assert(TaskScheduler::instance);
if (unlikely(task->getStatus() == ExecTaskStatus::WAITING))
{
TaskScheduler::instance->submitToWaitReactor(std::move(task));
}
else
{
assert(task->getStatus() == ExecTaskStatus::RUNNING);
TaskScheduler::instance->submitToCPUTaskThreadPool(std::move(task));
}
TaskScheduler::instance->submit(std::move(task));
}

private:
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class PipelineTask

ExecTaskStatus awaitImpl() override { return runAwait(); }

void notifyImpl() override { runNotify(); }
ExecTaskStatus notifyImpl() override { return runNotify(); }

void doFinalizeImpl() override
{
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,11 @@ class PipelineTaskBase
pipeline_exec_holder.reset();
}

void runNotify()
ExecTaskStatus runNotify()
{
assert(pipeline_exec);
pipeline_exec->notify();
return ExecTaskStatus::RUNNING;
}

private:
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class RFWaitTask : public Task
int max_wait_time_ms,
RuntimeFilteList && waiting_rf_list_,
RuntimeFilteList && ready_rf_list_)
: Task(exec_context_, req_id)
: Task(exec_context_, req_id, ExecTaskStatus::WAITING)
, task_pool(task_pool_)
, max_wait_time_ns(max_wait_time_ms < 0 ? 0 : 1000000UL * max_wait_time_ms)
, waiting_rf_list(std::move(waiting_rf_list_))
Expand Down Expand Up @@ -75,7 +75,7 @@ class RFWaitTask : public Task
}

private:
ExecTaskStatus executeImpl() override { return ExecTaskStatus::WAITING; }
ExecTaskStatus executeImpl() override { throw Exception("unreachable"); }

ExecTaskStatus awaitImpl() override
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SimplePipelineTask

ExecTaskStatus awaitImpl() override { return runAwait(); }

void notifyImpl() override { runNotify(); }
ExecTaskStatus notifyImpl() override { return runNotify(); }

void finalizeImpl() override
{
Expand Down
Loading