Skip to content

Commit

Permalink
Pipeline: use notify instead of polling for UnorderedSourceOp (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
SeaRise authored Mar 29, 2024
1 parent 4440b5e commit aa67e0c
Show file tree
Hide file tree
Showing 18 changed files with 313 additions and 69 deletions.
5 changes: 4 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"pipeline scheduler", \
Gauge, \
F(type_waiting_tasks_count, {"type", "waiting_tasks_count"}), \
F(type_wait_for_notify_tasks_count, {"type", "wait_for_notify_tasks_count"}), \
F(type_cpu_pending_tasks_count, {"type", "cpu_pending_tasks_count"}), \
F(type_cpu_executing_tasks_count, {"type", "cpu_executing_tasks_count"}), \
F(type_io_pending_tasks_count, {"type", "io_pending_tasks_count"}), \
Expand All @@ -755,7 +756,8 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_io_execute, {{"type", "io_execute"}}, ExpBuckets{0.005, 2, 20}), \
F(type_cpu_queue, {{"type", "cpu_queue"}}, ExpBuckets{0.005, 2, 20}), \
F(type_io_queue, {{"type", "io_queue"}}, ExpBuckets{0.005, 2, 20}), \
F(type_await, {{"type", "await"}}, ExpBuckets{0.005, 2, 20})) \
F(type_await, {{"type", "await"}}, ExpBuckets{0.005, 2, 20}), \
F(type_wait_for_notify, {{"type", "wait_for_notify"}}, ExpBuckets{0.005, 2, 20})) \
M(tiflash_pipeline_task_execute_max_time_seconds_per_round, \
"Bucketed histogram of pipeline task execute max time per round in seconds", \
Histogram, /* these command usually cost several hundred milliseconds to several seconds, increase the start bucket to 5ms */ \
Expand All @@ -765,6 +767,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"pipeline task change to status", \
Counter, \
F(type_to_waiting, {"type", "to_waiting"}), \
F(type_to_wait_for_notify, {"type", "to_wait_for_notify"}), \
F(type_to_running, {"type", "to_running"}), \
F(type_to_io, {"type", "to_io"}), \
F(type_to_finished, {"type", "to_finished"}), \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ extern const char random_pipeline_model_execute_suffix_failpoint[];
case OperatorStatus::WAITING: \
fillAwaitable((op).get()); \
return (op_status); \
/* For unexpected status, an immediate return is required. */ \
/* For other status, an immediate return is required. */ \
default: \
return (op_status); \
}
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Flash/Pipeline/Schedule/Reactor/WaitReactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Common/setThreadName.h>
#include <Flash/Pipeline/Schedule/Reactor/WaitReactor.h>
#include <Flash/Pipeline/Schedule/TaskScheduler.h>
#include <Flash/Pipeline/Schedule/Tasks/NotifyFuture.h>
#include <Flash/Pipeline/Schedule/Tasks/TaskHelper.h>
#include <common/logger_useful.h>
#include <errno.h>
Expand All @@ -29,6 +30,7 @@ WaitReactor::WaitReactor(TaskScheduler & scheduler_)
: scheduler(scheduler_)
{
GET_METRIC(tiflash_pipeline_scheduler, type_waiting_tasks_count).Set(0);
GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count).Set(0);
thread = std::thread(&WaitReactor::loop, this);
}

Expand All @@ -50,6 +52,9 @@ bool WaitReactor::awaitAndCollectReadyTask(WaitingTask && task)
task_ptr->profile_info.elapsedAwaitTime();
io_tasks.push_back(std::move(task.first));
return true;
case ExecTaskStatus::WAIT_FOR_NOTIFY:
registerTaskToFuture(std::move(task.first));
return true;
case FINISH_STATUS:
task_ptr->profile_info.elapsedAwaitTime();
task_ptr->startTraceMemory();
Expand Down Expand Up @@ -161,8 +166,8 @@ void WaitReactor::react(WaitingTasks & local_waiting_tasks)
++task_it;
}

#ifdef __APPLE__
auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_waiting_tasks_count);
#if __APPLE__ && __clang__
__thread auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_waiting_tasks_count);
#else
thread_local auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_waiting_tasks_count);
#endif
Expand Down
42 changes: 42 additions & 0 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Pipeline/Schedule/Tasks/NotifyFuture.h>

namespace DB
{
#if __APPLE__ && __clang__
__thread NotifyFuturePtr current_notify_future = nullptr;
#else
thread_local NotifyFuturePtr current_notify_future = nullptr;
#endif

void setNotifyFuture(NotifyFuturePtr new_future)
{
assert(current_notify_future == nullptr);
current_notify_future = std::move(new_future);
}

void clearNotifyFuture()
{
current_notify_future.reset();
}

void registerTaskToFuture(TaskPtr && task)
{
assert(current_notify_future != nullptr);
current_notify_future->registerTask(std::move(task));
current_notify_future.reset();
}
} // namespace DB
39 changes: 39 additions & 0 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/NotifyFuture.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Pipeline/Schedule/Tasks/Task.h>

namespace DB
{
struct NotifyFuture
{
NotifyFuture() = default;
virtual ~NotifyFuture() = default;
virtual void registerTask(TaskPtr && task) = 0;
};
using NotifyFuturePtr = std::shared_ptr<NotifyFuture>;

#if __APPLE__ && __clang__
extern __thread NotifyFuturePtr current_notify_future;
#else
extern thread_local NotifyFuturePtr current_notify_future;
#endif

void setNotifyFuture(NotifyFuturePtr new_future);
void clearNotifyFuture();
void registerTaskToFuture(TaskPtr && task);

} // namespace DB
100 changes: 100 additions & 0 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/PipeConditionVariable.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Pipeline/Schedule/TaskScheduler.h>
#include <Flash/Pipeline/Schedule/Tasks/Task.h>

#include <deque>

namespace DB
{
class PipeConditionVariable
{
public:
inline void registerTask(TaskPtr && task)
{
assert(task);
assert(task->getStatus() == ExecTaskStatus::WAIT_FOR_NOTIFY);
{
std::lock_guard lock(mu);
tasks.push_back(std::move(task));
}

#if __APPLE__ && __clang__
__thread auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count);
#else
thread_local auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count);
#endif
metrics.Increment();
}

inline void notifyOne()
{
TaskPtr task;
{
std::lock_guard lock(mu);
if (tasks.empty())
return;
task = std::move(tasks.front());
tasks.pop_front();
}
assert(task);
notifyTaskDirectly(std::move(task));

#if __APPLE__ && __clang__
__thread auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count);
#else
thread_local auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count);
#endif
metrics.Decrement();
}

inline void notifyAll()
{
std::deque<TaskPtr> cur_tasks;
{
std::lock_guard lock(mu);
std::swap(cur_tasks, tasks);
}
size_t tasks_cnt = cur_tasks.size();
while (!cur_tasks.empty())
{
notifyTaskDirectly(std::move(cur_tasks.front()));
cur_tasks.pop_front();
}

#if __APPLE__ && __clang__
__thread auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count);
#else
thread_local auto & metrics = GET_METRIC(tiflash_pipeline_scheduler, type_wait_for_notify_tasks_count);
#endif
metrics.Decrement(tasks_cnt);
}

static inline void notifyTaskDirectly(TaskPtr && task)
{
assert(task);
task->notify();
task->profile_info.elapsedWaitForNotifyTime();
assert(TaskScheduler::instance);
TaskScheduler::instance->submitToCPUTaskThreadPool(std::move(task));
}

private:
std::mutex mu;
std::deque<TaskPtr> tasks;
};
} // namespace DB
4 changes: 3 additions & 1 deletion dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ class PipelineTask

void doFinalizeImpl() override
{
runFinalize(profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs() + getScheduleDuration());
runFinalize(
profile_info.getCPUPendingTimeNs() + profile_info.getIOPendingTimeNs()
+ profile_info.getWaitForNotifyTimeNs() + getScheduleDuration());
}
};
} // namespace DB
44 changes: 24 additions & 20 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/PipelineTaskBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,30 @@ namespace DB
/// - OperatorStatus::WAITING ==> ExecTaskStatus::WAITING
/// - OperatorStatus::NEED_INPUT/HAS_OUTPUT ==> ExecTaskStatus::RUNNING

#define MAP_NOT_RUNNING_TASK_STATUS \
case OperatorStatus::FINISHED: \
{ \
return ExecTaskStatus::FINISHED; \
} \
case OperatorStatus::CANCELLED: \
{ \
return ExecTaskStatus::CANCELLED; \
} \
case OperatorStatus::IO_IN: \
{ \
return ExecTaskStatus::IO_IN; \
} \
case OperatorStatus::IO_OUT: \
{ \
return ExecTaskStatus::IO_OUT; \
} \
case OperatorStatus::WAITING: \
{ \
return ExecTaskStatus::WAITING; \
#define MAP_NOT_RUNNING_TASK_STATUS \
case OperatorStatus::FINISHED: \
{ \
return ExecTaskStatus::FINISHED; \
} \
case OperatorStatus::CANCELLED: \
{ \
return ExecTaskStatus::CANCELLED; \
} \
case OperatorStatus::IO_IN: \
{ \
return ExecTaskStatus::IO_IN; \
} \
case OperatorStatus::IO_OUT: \
{ \
return ExecTaskStatus::IO_OUT; \
} \
case OperatorStatus::WAITING: \
{ \
return ExecTaskStatus::WAITING; \
} \
case OperatorStatus::WAIT_FOR_NOTIFY: \
{ \
return ExecTaskStatus::WAIT_FOR_NOTIFY; \
}

#define UNEXPECTED_OP_STATUS(op_status, function_name) \
Expand Down
Loading

0 comments on commit aa67e0c

Please sign in to comment.