From f00efe1a422d53d6bcfd3f1345ba5a013aea32c2 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Wed, 28 Aug 2024 16:54:02 +0800 Subject: [PATCH 1/7] storage: add local indexer scheduler Signed-off-by: Lloyd-Pottiger --- .../DeltaMerge/LocalIndexerScheduler.cpp | 397 ++++++++++++ .../DeltaMerge/LocalIndexerScheduler.h | 176 ++++++ .../DeltaMerge/LocalIndexerScheduler_fwd.h | 26 + .../tests/gtest_local_indexer_scheduler.cpp | 588 ++++++++++++++++++ 4 files changed, 1187 insertions(+) create mode 100644 dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp create mode 100644 dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h create mode 100644 dbms/src/Storages/DeltaMerge/LocalIndexerScheduler_fwd.h create mode 100644 dbms/src/Storages/DeltaMerge/tests/gtest_local_indexer_scheduler.cpp diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp new file mode 100644 index 00000000000..69132e3124b --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp @@ -0,0 +1,397 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB::DM +{ + +LocalIndexerScheduler::LocalIndexerScheduler(const Options & options) + : logger(Logger::get()) + , pool(std::make_unique(options.pool_size, options.pool_size, options.pool_size + 1)) + , pool_max_memory_limit(options.memory_limit) + , pool_current_memory(0) +{ + // QueueSize = PoolSize+1, because our scheduler will try to schedule next task + // right before the current task is finished. + + LOG_INFO( + logger, + "Initialized LocalIndexerScheduler, pool_size={}, memory_limit_mb={:.1f}", + options.pool_size, + static_cast(options.memory_limit) / 1024 / 1024); + + if (options.auto_start) + start(); +} + +LocalIndexerScheduler::~LocalIndexerScheduler() +{ + LOG_INFO(logger, "LocalIndexerScheduler is destroying. Waiting scheduler and tasks to finish..."); + + // First quit the scheduler. Don't schedule more tasks. + is_shutting_down = true; + { + std::unique_lock lock(mutex); + scheduler_need_wakeup = true; + scheduler_notifier.notify_all(); + } + + if (is_started) + scheduler_thread.join(); + + // Then wait all running tasks to finish. + pool.reset(); + + LOG_INFO(logger, "LocalIndexerScheduler is destroyed"); +} + +void LocalIndexerScheduler::start() +{ + if (is_started) + return; + + scheduler_thread = std::thread([this]() { schedulerLoop(); }); + is_started = true; +} + +void LocalIndexerScheduler::waitForFinish() +{ + while (true) + { + std::unique_lock lock(mutex); + if (all_tasks_count == 0 && running_tasks_count == 0) + return; + on_finish_notifier.wait(lock); + } +} + +void LocalIndexerScheduler::pushTask(const Task & task) +{ + if (pool_max_memory_limit > 0 && task.request_memory > pool_max_memory_limit) + throw Exception(fmt::format( + "Requests memory exceeds limit (request={} limit={})", + task.request_memory, + pool_max_memory_limit)); + + std::unique_lock lock(mutex); + + const auto internal_task = std::make_shared(InternalTask{ + .user_task = task, + .created_at = Stopwatch(), + .scheduled_at = Stopwatch{}, // Not scheduled + }); + + // Whether task is ready is undertermined. It can be changed any time + // according to current running tasks. + // The scheduler will find a better place for this task when meeting it. + ready_tasks[task.keyspace_id][task.table_id].emplace_back(internal_task); + ++all_tasks_count; + + scheduler_need_wakeup = true; + scheduler_notifier.notify_all(); +} + +size_t LocalIndexerScheduler::dropTasks(KeyspaceID keyspace_id, TableID table_id) +{ + size_t dropped_tasks = 0; + + std::unique_lock lock(mutex); + auto it = ready_tasks.find(keyspace_id); + if (it != ready_tasks.end()) + { + auto & tasks_by_table = it->second; + auto table_it = tasks_by_table.find(table_id); + if (table_it != tasks_by_table.end()) + { + dropped_tasks += table_it->second.size(); + tasks_by_table.erase(table_it); + } + if (tasks_by_table.empty()) + ready_tasks.erase(it); + } + for (auto it = unready_tasks.begin(); it != unready_tasks.end();) + { + if ((*it)->user_task.keyspace_id == keyspace_id && (*it)->user_task.table_id == table_id) + { + it = unready_tasks.erase(it); + ++dropped_tasks; + } + else + { + it++; + } + } + + LOG_INFO(logger, "Removed {} tasks, keyspace_id={} table_id={}", dropped_tasks, keyspace_id, table_id); + + return dropped_tasks; +} + +bool LocalIndexerScheduler::isTaskReady(std::unique_lock &, const InternalTaskPtr & task) +{ + for (const auto & page_id : task->user_task.page_ids) + { + if (adding_index_page_id_set.find(page_id) != adding_index_page_id_set.end()) + return false; + } + return true; +} + +void LocalIndexerScheduler::taskOnSchedule(std::unique_lock &, const InternalTaskPtr & task) +{ + for (const auto & page_id : task->user_task.page_ids) + { + auto [it, inserted] = adding_index_page_id_set.insert(page_id); + RUNTIME_CHECK(inserted); + UNUSED(it); + } + + LOG_DEBUG( // + logger, + "Start LocalIndex task, keyspace_id={} table_id={} page_ids={} " + "memory_[this/total/limit]_mb={:.1f}/{:.1f}/{:.1f} all_tasks={}", + task->user_task.keyspace_id, + task->user_task.table_id, + task->user_task.page_ids, + static_cast(task->user_task.request_memory) / 1024 / 1024, + static_cast(pool_current_memory) / 1024 / 1024, + static_cast(pool_max_memory_limit) / 1024 / 1024, + all_tasks_count); + + // No need to update unready_tasks here, because we will update unready_tasks + // when iterating the full list. +} + +void LocalIndexerScheduler::taskOnFinish(std::unique_lock & lock, const InternalTaskPtr & task) +{ + for (const auto & page_id : task->user_task.page_ids) + { + auto erased = adding_index_page_id_set.erase(page_id); + RUNTIME_CHECK(erased == 1, erased); + } + + moveBackReadyTasks(lock); + + auto elapsed_since_create = task->created_at.elapsedSeconds(); + auto elapsed_since_schedule = task->scheduled_at.elapsedSeconds(); + + LOG_DEBUG( // + logger, + "Finish LocalIndex task, keyspace_id={} table_id={} page_ids={} " + "memory_[this/total/limit]_mb={:.1f}/{:.1f}/{:.1f} " + "[schedule/task]_cost_sec={:.1f}/{:.1f}", + task->user_task.keyspace_id, + task->user_task.table_id, + task->user_task.page_ids, + static_cast(task->user_task.request_memory) / 1024 / 1024, + static_cast(pool_current_memory) / 1024 / 1024, + static_cast(pool_max_memory_limit) / 1024 / 1024, + elapsed_since_create - elapsed_since_schedule, + elapsed_since_schedule); +} + +void LocalIndexerScheduler::moveBackReadyTasks(std::unique_lock & lock) +{ + for (auto it = unready_tasks.begin(); it != unready_tasks.end();) + { + auto & task = *it; + if (isTaskReady(lock, task)) + { + ready_tasks[task->user_task.keyspace_id][task->user_task.table_id].emplace_back(task); + it = unready_tasks.erase(it); + } + else + { + it++; + } + } +} + +bool LocalIndexerScheduler::tryAddTaskToPool(std::unique_lock & lock, const InternalTaskPtr & task) +{ + // Memory limit reached + if (pool_max_memory_limit > 0 && pool_current_memory + task->user_task.request_memory > pool_max_memory_limit) + { + return false; + } + + auto real_job = [task, this]() { + SCOPE_EXIT({ + std::unique_lock lock(mutex); + pool_current_memory -= task->user_task.request_memory; + running_tasks_count--; + taskOnFinish(lock, task); + on_finish_notifier.notify_all(); + + scheduler_need_wakeup = true; + scheduler_notifier.notify_all(); + }); + + task->scheduled_at.start(); + + try + { + task->user_task.workload(); + } + catch (...) + { + // TODO: We should ensure ADD INDEX task is retried somewhere else? + tryLogCurrentException( + logger, + fmt::format( + "LocalIndexScheduler meet exception when running task: keyspace_id={} table_id={}", + task->user_task.keyspace_id, + task->user_task.table_id)); + } + }; + + RUNTIME_CHECK(pool); + if (!pool->trySchedule(real_job)) + // Concurrent task limit reached + return false; + + ++running_tasks_count; + pool_current_memory += task->user_task.request_memory; + taskOnSchedule(lock, task); + + return true; +} + +LocalIndexerScheduler::ScheduleResult LocalIndexerScheduler::scheduleNextTask(std::unique_lock & lock) +{ + if (ready_tasks.empty()) + return ScheduleResult::FAIL_NO_TASK; + + // Find the keyspace ID which is just > last_schedule_keyspace_id. + auto keyspace_it = ready_tasks.upper_bound(last_schedule_keyspace_id); + if (keyspace_it == ready_tasks.end()) + keyspace_it = ready_tasks.begin(); + const KeyspaceID keyspace_id = keyspace_it->first; + + auto & tasks_by_table = keyspace_it->second; + RUNTIME_CHECK(!tasks_by_table.empty()); + + TableID last_schedule_table_id = -1; + if (last_schedule_table_id_by_ks.find(keyspace_id) != last_schedule_table_id_by_ks.end()) + last_schedule_table_id = last_schedule_table_id_by_ks[keyspace_id]; + + auto table_it = tasks_by_table.upper_bound(last_schedule_table_id); + if (table_it == tasks_by_table.end()) + table_it = tasks_by_table.begin(); + const TableID table_id = table_it->first; + + auto & tasks = table_it->second; + RUNTIME_CHECK(!tasks.empty()); + auto task_it = tasks.begin(); + auto task = *task_it; + + auto remove_current_task = [&]() { + tasks.erase(task_it); + if (tasks.empty()) + { + tasks_by_table.erase(table_it); + if (tasks_by_table.empty()) + { + ready_tasks.erase(keyspace_id); + last_schedule_table_id_by_ks.erase(keyspace_id); + } + } + }; + + if (!isTaskReady(lock, task)) + { + // The task is not ready. Move it to unready_tasks. + unready_tasks.emplace_back(task); + remove_current_task(); + + LOG_DEBUG( + logger, + "LocalIndex task is not ready, will try again later when it is ready. " + "keyspace_id={} table_id={} page_ids={}", + task->user_task.keyspace_id, + task->user_task.table_id, + task->user_task.page_ids); + + // Let the caller retry. At next retry, we will continue using this + // Keyspace+Table and try next task. + return ScheduleResult::RETRY; + } + + auto ok = tryAddTaskToPool(lock, task); + if (!ok) + // The pool is full. May be memory limit reached or concurrent task limit reached. + // We will not try any more tasks. + // At next retry, we will continue using this Keyspace+Table and try next task. + return ScheduleResult::FAIL_FULL; + + last_schedule_table_id_by_ks[keyspace_id] = table_id; + last_schedule_keyspace_id = keyspace_id; + remove_current_task(); + all_tasks_count--; + + return ScheduleResult::OK; +} + +void LocalIndexerScheduler::schedulerLoop() +{ + while (true) + { + if (is_shutting_down) + return; + + std::unique_lock lock(mutex); + scheduler_notifier.wait(lock, [&] { return scheduler_need_wakeup || is_shutting_down; }); + scheduler_need_wakeup = false; + + try + { + while (true) + { + if (is_shutting_down) + return; + + auto result = scheduleNextTask(lock); + if (result == ScheduleResult::FAIL_FULL) + { + // Cannot schedule task any more, start to wait + break; + } + else if (result == ScheduleResult::FAIL_NO_TASK) + { + // No task to schedule, start to wait + break; + } + else if (result == ScheduleResult::RETRY) + { + // Retry schedule again + } + else if (result == ScheduleResult::OK) + { + // Task is scheduled, continue to schedule next task + } + } + } + catch (...) + { + // Catch all exceptions to avoid the scheduler thread to be terminated. + // We should log the exception here. + tryLogCurrentException(logger, __PRETTY_FUNCTION__); + } + } +} + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h new file mode 100644 index 00000000000..65474633fa7 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h @@ -0,0 +1,176 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +namespace DB::DM +{ + +// Note: this scheduler is global in the TiFlash instance. +class LocalIndexerScheduler +{ +public: + struct Task + { + // Note: The scheduler will try to schedule farely according to keyspace_id and table_id. + KeyspaceID keyspace_id; + TableID table_id; + + // Used for the scheduler to avoid concurrently adding index for the same Page (ColumnFileTiny/DMFile). + std::vector page_ids; + + // Used for the scheduler to control the maximum requested memory usage. + size_t request_memory; + + // The actual index setup workload. + // The scheduler does not care about the workload. + ThreadPool::Job workload; + }; + + struct Options + { + size_t pool_size = 1; + size_t memory_limit = 0; // 0 = unlimited + bool auto_start = true; + }; + +private: + struct InternalTask + { + const Task user_task; + Stopwatch created_at{}; + Stopwatch scheduled_at{}; + }; + + using InternalTaskPtr = std::shared_ptr; + +public: + static LocalIndexerSchedulerPtr create(const Options & options) + { + return std::make_shared(options); + } + + explicit LocalIndexerScheduler(const Options & options); + + ~LocalIndexerScheduler(); + + /** + * @brief Start the scheduler. In some tests we need to start scheduler + * after some tasks are pushed. + */ + void start(); + + /** + * @brief Blocks until there is no tasks remaining in the queue and there is no running tasks. + * Should be only used in tests. + */ + void waitForFinish(); + + /** + * @brief Push a task to the pool. The task may not be scheduled immediately. + */ + void pushTask(const Task & task); + + /** + * @brief Drop all tasks matching specified keyspace id and table id. + */ + size_t dropTasks(KeyspaceID keyspace_id, TableID table_id); + +private: + // The set of DMFiles that are currently adding index. + // There maybe multiple threads trying to add index for the same Page. For example, + // after logical split two segments share the same DMFile, so that adding index for the two segments + // could result in adding the same index for the same DMFile. It's just a waste of resource. + std::unordered_set adding_index_page_id_set; + + bool isTaskReady(std::unique_lock &, const InternalTaskPtr & task); + + void taskOnSchedule(std::unique_lock &, const InternalTaskPtr & task); + + void taskOnFinish(std::unique_lock & lock, const InternalTaskPtr & task); + + void moveBackReadyTasks(std::unique_lock & lock); + +private: + bool is_started = false; + std::thread scheduler_thread; + + /// Try to add a task to the pool. Returns false if the pool is full + /// (for example, reaches concurrent task limit or memory limit). + /// When pool is full, we will not try to schedule any more tasks at this moment. + /// + /// Actually there could be possibly small tasks to schedule when + /// reaching memory limit, but this will cause the scheduler tend to + /// only schedule small tasks, keep large tasks starving under + /// heavy pressure. + bool tryAddTaskToPool(std::unique_lock & lock, const InternalTaskPtr & task); + + KeyspaceID last_schedule_keyspace_id = 0; + std::map last_schedule_table_id_by_ks; + + enum class ScheduleResult + { + RETRY, + FAIL_FULL, + FAIL_NO_TASK, + OK, + }; + + ScheduleResult scheduleNextTask(std::unique_lock & lock); + + void schedulerLoop(); + +private: + std::mutex mutex; + + const LoggerPtr logger; + + /// The thread pool for creating indices in the background. + std::unique_ptr pool; + /// The current memory usage of the pool. It is not accurate and the memory + /// is determined when task is adding to the pool. + const size_t pool_max_memory_limit; + size_t pool_current_memory = 0; + + size_t all_tasks_count = 0; // ready_tasks + unready_tasks + /// Schedule farely according to keyspace_id, and then according to table_id. + std::map>> ready_tasks{}; + /// When the scheduler will stop waiting and try to schedule again? + /// 1. When a new task is added (and pool is not full) + /// 2. When a pool task is finished + std::condition_variable scheduler_notifier; + bool scheduler_need_wakeup = false; // Avoid false wake-ups. + + /// Notified when one task is finished. + std::condition_variable on_finish_notifier; + size_t running_tasks_count = 0; + + /// Some tasks cannot be scheduled at this moment. For example, its DMFile + /// is used in another index building task. These tasks are extracted + /// from ready_tasks and put into unready_tasks. + std::list unready_tasks{}; + + std::atomic is_shutting_down = false; +}; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler_fwd.h b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler_fwd.h new file mode 100644 index 00000000000..1f77cf2b321 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler_fwd.h @@ -0,0 +1,26 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB::DM +{ + +class LocalIndexerScheduler; + +using LocalIndexerSchedulerPtr = std::shared_ptr; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_local_indexer_scheduler.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_local_indexer_scheduler.cpp new file mode 100644 index 00000000000..794e54d8c08 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_local_indexer_scheduler.cpp @@ -0,0 +1,588 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include +#include +#include + +namespace DB::DM::tests +{ + +class LocalIndexerSchedulerTest : public ::testing::Test +{ +protected: + void pushResult(String result) + { + std::unique_lock lock(results_mu); + results.push_back(result); + } + + std::mutex results_mu; + std::vector results; +}; + +TEST_F(LocalIndexerSchedulerTest, StartScheduler) +try +{ + auto scheduler = LocalIndexerScheduler::create({ + .pool_size = 5, + .auto_start = false, + }); + + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {}, + .request_memory = 0, + .workload = [this]() { pushResult("foo"); }, + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + ASSERT_EQ(results.size(), 0); + + scheduler.reset(); + ASSERT_EQ(results.size(), 0); + + scheduler = LocalIndexerScheduler::create({ + .pool_size = 5, + .auto_start = false, + }); + + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {}, + .request_memory = 0, + .workload = [this]() { pushResult("bar"); }, + }); + + scheduler->start(); + scheduler->waitForFinish(); + + ASSERT_EQ(1, results.size()); + ASSERT_STREQ("bar", results[0].c_str()); +} +CATCH + + +TEST_F(LocalIndexerSchedulerTest, KeyspaceFair) +try +{ + auto scheduler = LocalIndexerScheduler::create({ + .pool_size = 1, + .auto_start = false, + }); + + scheduler->pushTask({ + .keyspace_id = 2, + .table_id = 1, + .page_ids = {}, + .request_memory = 0, + .workload = [&]() { pushResult("ks2_t1"); }, + }); + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 2, + .page_ids = {}, + .request_memory = 0, + .workload = [&]() { pushResult("ks1_t2"); }, + }); + scheduler->pushTask({ + .keyspace_id = 3, + .table_id = 3, + .page_ids = {}, + .request_memory = 0, + .workload = [&]() { pushResult("ks3_t3"); }, + }); + scheduler->pushTask({ + .keyspace_id = 2, + .table_id = 4, + .page_ids = {}, + .request_memory = 0, + .workload = [&]() { pushResult("ks2_t4"); }, + }); + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {}, + .request_memory = 0, + .workload = [&]() { pushResult("ks1_t1"); }, + }); + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 3, + .page_ids = {}, + .request_memory = 0, + .workload = [&]() { pushResult("ks1_t3"); }, + }); + + scheduler->start(); + scheduler->waitForFinish(); + + // Scheduler is scheduled by KeyspaceID asc order and TableID asc order. + ASSERT_EQ(results.size(), 6); + ASSERT_STREQ(results[0].c_str(), "ks1_t1"); + ASSERT_STREQ(results[1].c_str(), "ks2_t1"); + ASSERT_STREQ(results[2].c_str(), "ks3_t3"); + ASSERT_STREQ(results[3].c_str(), "ks1_t2"); + ASSERT_STREQ(results[4].c_str(), "ks2_t4"); + ASSERT_STREQ(results[5].c_str(), "ks1_t3"); + + results.clear(); + + scheduler->pushTask({ + .keyspace_id = 2, + .table_id = 1, + .page_ids = {}, + .request_memory = 0, + .workload = [&]() { pushResult("ks2_t1"); }, + }); + + scheduler->waitForFinish(); + + ASSERT_EQ(results.size(), 1); + ASSERT_STREQ(results[0].c_str(), "ks2_t1"); +} +CATCH + + +TEST_F(LocalIndexerSchedulerTest, TableFair) +try +{ + auto scheduler = LocalIndexerScheduler::create({ + .pool_size = 1, + .auto_start = false, + }); + + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 3, + .page_ids = {}, + .request_memory = 0, + .workload = [&]() { pushResult("ks1_t3_#1"); }, + }); + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {}, + .request_memory = 0, + .workload = [&]() { pushResult("ks1_t1_#1"); }, + }); + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 3, + .page_ids = {}, + .request_memory = 0, + .workload = [&]() { pushResult("ks1_t3_#2"); }, + }); + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 2, + .page_ids = {}, + .request_memory = 0, + .workload = [&]() { pushResult("ks1_t2_#1"); }, + }); + scheduler->pushTask({ + .keyspace_id = 2, + .table_id = 1, + .page_ids = {}, + .request_memory = 0, + .workload = [&]() { pushResult("ks2_t1_#1"); }, + }); + + scheduler->start(); + scheduler->waitForFinish(); + + // Scheduler is scheduled by KeyspaceID asc order and TableID asc order. + ASSERT_EQ(results.size(), 5); + ASSERT_STREQ(results[0].c_str(), "ks1_t1_#1"); + ASSERT_STREQ(results[1].c_str(), "ks2_t1_#1"); + ASSERT_STREQ(results[2].c_str(), "ks1_t2_#1"); + ASSERT_STREQ(results[3].c_str(), "ks1_t3_#1"); + ASSERT_STREQ(results[4].c_str(), "ks1_t3_#2"); +} +CATCH + + +TEST_F(LocalIndexerSchedulerTest, TaskExceedMemoryLimit) +try +{ + auto scheduler = LocalIndexerScheduler::create({ + .pool_size = 10, + .memory_limit = 2, + .auto_start = false, + }); + + ASSERT_THROW( + { + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {}, + .request_memory = 100, + .workload = [&]() { pushResult("foo"); }, + }); + }, + DB::Exception); + ASSERT_NO_THROW({ + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {}, + .request_memory = 0, + .workload = [&]() { pushResult("bar"); }, + }); + }); + + scheduler->start(); + scheduler->waitForFinish(); + + ASSERT_EQ(results.size(), 1); + ASSERT_STREQ(results[0].c_str(), "bar"); + + results.clear(); + + scheduler = LocalIndexerScheduler::create({ + .pool_size = 10, + .memory_limit = 0, + }); + + ASSERT_NO_THROW({ + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {}, + .request_memory = 100, + .workload = [&]() { pushResult("foo"); }, + }); + }); + ASSERT_NO_THROW({ + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {}, + .request_memory = 0, + .workload = [&]() { pushResult("bar"); }, + }); + }); + + scheduler->start(); + scheduler->waitForFinish(); + + ASSERT_EQ(results.size(), 2); + ASSERT_STREQ(results[0].c_str(), "foo"); + ASSERT_STREQ(results[1].c_str(), "bar"); +} +CATCH + + +TEST_F(LocalIndexerSchedulerTest, MemoryLimit) +try +{ + auto scheduler = LocalIndexerScheduler::create({ + .pool_size = 10, + .memory_limit = 2, + .auto_start = false, + }); + + auto task_1_is_started = std::make_shared>(); + auto task_2_is_started = std::make_shared>(); + auto task_3_is_started = std::make_shared>(); + + auto task_1_wait = std::make_shared>(); + auto task_2_wait = std::make_shared>(); + auto task_3_wait = std::make_shared>(); + + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {}, + .request_memory = 1, + .workload = + [=]() { + task_1_is_started->set_value(); + task_1_wait->get_future().wait(); + }, + }); + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {}, + .request_memory = 1, + .workload = + [=]() { + task_2_is_started->set_value(); + task_2_wait->get_future().wait(); + }, + }); + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {}, + .request_memory = 1, + .workload = + [=]() { + task_3_is_started->set_value(); + task_3_wait->get_future().wait(); + }, + }); + + scheduler->start(); + + task_1_is_started->get_future().wait(); + task_2_is_started->get_future().wait(); + + auto task_3_is_started_future = task_3_is_started->get_future(); + + // We should fail to got task 3 start running, because current memory limit is reached + ASSERT_EQ(task_3_is_started_future.wait_for(std::chrono::milliseconds(500)), std::future_status::timeout); + + task_1_wait->set_value(); + + task_3_is_started_future.wait(); + + task_2_wait->set_value(); + task_3_wait->set_value(); +} +CATCH + + +TEST_F(LocalIndexerSchedulerTest, ShutdownWithPendingTasks) +try +{ + auto scheduler = LocalIndexerScheduler::create({ + .pool_size = 1, + .auto_start = false, + }); + + auto task_1_is_started = std::make_shared>(); + auto task_1_wait = std::make_shared>(); + + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {}, + .request_memory = 0, + .workload = + [=]() { + task_1_is_started->set_value(); + task_1_wait->get_future().wait(); + }, + }); + + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {}, + .request_memory = 0, + .workload = + [=]() { + // Should not enter here. + ASSERT_TRUE(false); + }, + }); + + scheduler->start(); + + // Ensure task 1 is running + task_1_is_started->get_future().wait(); + + // Shutdown the scheduler. + auto shutdown_th = std::async([&]() { scheduler.reset(); }); + + // The shutdown should be waiting for task 1 to finish + ASSERT_EQ(shutdown_th.wait_for(std::chrono::milliseconds(500)), std::future_status::timeout); + + // After task 1 finished, the scheduler shutdown should be ok. + task_1_wait->set_value(); + shutdown_th.wait(); +} +CATCH + + +TEST_F(LocalIndexerSchedulerTest, WorkloadException) +try +{ + auto scheduler = LocalIndexerScheduler::create({ + .pool_size = 1, + .auto_start = false, + }); + + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {}, + .request_memory = 0, + .workload = [&]() { throw DB::Exception("foo"); }, + }); + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {}, + .request_memory = 0, + .workload = [&]() { pushResult("bar"); }, + }); + + scheduler->start(); + scheduler->waitForFinish(); + + ASSERT_EQ(results.size(), 1); + ASSERT_STREQ(results[0].c_str(), "bar"); +} +CATCH + + +TEST_F(LocalIndexerSchedulerTest, DMFileIsUsing) +try +{ + auto scheduler = LocalIndexerScheduler::create({ + .pool_size = 4, + .auto_start = false, + }); + + auto task_1_is_started = std::make_shared>(); + auto task_2_is_started = std::make_shared>(); + auto task_3_is_started = std::make_shared>(); + + auto task_1_wait = std::make_shared>(); + + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {1}, + .request_memory = 0, + .workload = + [&]() { + task_1_is_started->set_value(); + task_1_wait->get_future().wait(); + }, + }); + + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {1, 2}, + .request_memory = 0, + .workload = [&]() { task_2_is_started->set_value(); }, + }); + + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {3}, + .request_memory = 0, + .workload = [&]() { task_3_is_started->set_value(); }, + }); + + scheduler->start(); + + task_1_is_started->get_future().wait(); + + auto task_2_is_started_future = task_2_is_started->get_future(); + // We should fail to got task 2 start running, because current dmfile is using + ASSERT_EQ(task_2_is_started_future.wait_for(std::chrono::milliseconds(500)), std::future_status::timeout); + // Task 3 is not using the dmfile, so it should run + task_3_is_started->get_future().wait(); + + // After task 1 is finished, task 2 should run + task_1_wait->set_value(); + task_2_is_started_future.wait(); + + scheduler->waitForFinish(); +} +CATCH + + +TEST_F(LocalIndexerSchedulerTest, DMFileFromDifferentTable) +try +{ + // When DMFiles come from different table, should not block + + auto scheduler = LocalIndexerScheduler::create({ + .pool_size = 4, + .auto_start = false, + }); + + auto task_1_is_started = std::make_shared>(); + auto task_2_is_started = std::make_shared>(); + + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {1}, + .request_memory = 0, + .workload = [&]() { task_1_is_started->set_value(); }, + }); + + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 2, + .page_ids = {1, 2}, + .request_memory = 0, + .workload = [&]() { task_2_is_started->set_value(); }, + }); + + scheduler->start(); + + task_1_is_started->get_future().wait(); + task_2_is_started->get_future().wait(); + + scheduler->waitForFinish(); +} +CATCH + + +TEST_F(LocalIndexerSchedulerTest, DMFileFromDifferentKeyspace) +try +{ + // When DMFiles come from different keyspace, should not block + + auto scheduler = LocalIndexerScheduler::create({ + .pool_size = 4, + .auto_start = false, + }); + + auto task_1_is_started = std::make_shared>(); + auto task_2_is_started = std::make_shared>(); + + scheduler->pushTask({ + .keyspace_id = 1, + .table_id = 1, + .page_ids = {1}, + .request_memory = 0, + .workload = [&]() { task_1_is_started->set_value(); }, + }); + + scheduler->pushTask({ + .keyspace_id = 2, + .table_id = 1, + .page_ids = {1}, + .request_memory = 0, + .workload = [&]() { task_2_is_started->set_value(); }, + }); + + scheduler->start(); + + task_1_is_started->get_future().wait(); + task_2_is_started->get_future().wait(); + + scheduler->waitForFinish(); +} +CATCH + +} // namespace DB::DM::tests From 1bbaaa09d3d0ccb21a5ee9f7cbd285006c5fc3d0 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Wed, 28 Aug 2024 17:47:31 +0800 Subject: [PATCH 2/7] fix Signed-off-by: Lloyd-Pottiger --- .../DeltaMerge/LocalIndexerScheduler.cpp | 18 +++- .../DeltaMerge/LocalIndexerScheduler.h | 37 ++++++- .../tests/gtest_local_indexer_scheduler.cpp | 101 +++++------------- 3 files changed, 78 insertions(+), 78 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp index 69132e3124b..d981a3b3ea0 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp @@ -145,7 +145,11 @@ bool LocalIndexerScheduler::isTaskReady(std::unique_lock &, const In { for (const auto & page_id : task->user_task.page_ids) { - if (adding_index_page_id_set.find(page_id) != adding_index_page_id_set.end()) + auto unique_page_id = UniquePageID{ + .page_type = task->user_task.page_type, + .page_id = page_id, + }; + if (adding_index_page_id_set.find(unique_page_id) != adding_index_page_id_set.end()) return false; } return true; @@ -155,7 +159,11 @@ void LocalIndexerScheduler::taskOnSchedule(std::unique_lock &, const { for (const auto & page_id : task->user_task.page_ids) { - auto [it, inserted] = adding_index_page_id_set.insert(page_id); + auto unique_page_id = UniquePageID{ + .page_type = task->user_task.page_type, + .page_id = page_id, + }; + auto [it, inserted] = adding_index_page_id_set.insert(unique_page_id); RUNTIME_CHECK(inserted); UNUSED(it); } @@ -180,7 +188,11 @@ void LocalIndexerScheduler::taskOnFinish(std::unique_lock & lock, co { for (const auto & page_id : task->user_task.page_ids) { - auto erased = adding_index_page_id_set.erase(page_id); + auto unique_page_id = UniquePageID{ + .page_type = task->user_task.page_type, + .page_id = page_id, + }; + auto erased = adding_index_page_id_set.erase(unique_page_id); RUNTIME_CHECK(erased == 1, erased); } diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h index 65474633fa7..baa8506cede 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h @@ -35,6 +35,12 @@ class LocalIndexerScheduler // Note: The scheduler will try to schedule farely according to keyspace_id and table_id. KeyspaceID keyspace_id; TableID table_id; + enum class PageType : UInt8 + { + DMFile = 0, + ColumnFileTiny = 1, + }; + PageType page_type; // Used for the scheduler to avoid concurrently adding index for the same Page (ColumnFileTiny/DMFile). std::vector page_ids; @@ -97,11 +103,38 @@ class LocalIndexerScheduler size_t dropTasks(KeyspaceID keyspace_id, TableID table_id); private: - // The set of DMFiles that are currently adding index. + struct UniquePageID + { + // We could use DMFile path as well, but this should be faster. + + Task::PageType page_type; + PageIdU64 page_id; + + bool operator==(const UniquePageID & other) const + { + return page_type == other.page_type && page_id == other.page_id; + } + }; + + struct UniquePageIDHasher + { + std::size_t operator()(const UniquePageID & id) const + { + using boost::hash_combine; + using boost::hash_value; + + std::size_t seed = 0; + hash_combine(seed, hash_value(id.page_type)); + hash_combine(seed, hash_value(id.page_id)); + return seed; + } + }; + + // The set of Page that are currently adding index. // There maybe multiple threads trying to add index for the same Page. For example, // after logical split two segments share the same DMFile, so that adding index for the two segments // could result in adding the same index for the same DMFile. It's just a waste of resource. - std::unordered_set adding_index_page_id_set; + std::unordered_set adding_index_page_id_set; bool isTaskReady(std::unique_lock &, const InternalTaskPtr & task); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_local_indexer_scheduler.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_local_indexer_scheduler.cpp index 794e54d8c08..547790a66ac 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_local_indexer_scheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_local_indexer_scheduler.cpp @@ -79,7 +79,6 @@ try } CATCH - TEST_F(LocalIndexerSchedulerTest, KeyspaceFair) try { @@ -91,42 +90,42 @@ try scheduler->pushTask({ .keyspace_id = 2, .table_id = 1, - .page_ids = {}, + .page_ids = {1}, .request_memory = 0, .workload = [&]() { pushResult("ks2_t1"); }, }); scheduler->pushTask({ .keyspace_id = 1, .table_id = 2, - .page_ids = {}, + .page_ids = {2}, .request_memory = 0, .workload = [&]() { pushResult("ks1_t2"); }, }); scheduler->pushTask({ .keyspace_id = 3, .table_id = 3, - .page_ids = {}, + .page_ids = {3}, .request_memory = 0, .workload = [&]() { pushResult("ks3_t3"); }, }); scheduler->pushTask({ .keyspace_id = 2, .table_id = 4, - .page_ids = {}, + .page_ids = {4}, .request_memory = 0, .workload = [&]() { pushResult("ks2_t4"); }, }); scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {}, + .page_ids = {5}, .request_memory = 0, .workload = [&]() { pushResult("ks1_t1"); }, }); scheduler->pushTask({ .keyspace_id = 1, .table_id = 3, - .page_ids = {}, + .page_ids = {6}, .request_memory = 0, .workload = [&]() { pushResult("ks1_t3"); }, }); @@ -148,7 +147,7 @@ try scheduler->pushTask({ .keyspace_id = 2, .table_id = 1, - .page_ids = {}, + .page_ids = {1}, .request_memory = 0, .workload = [&]() { pushResult("ks2_t1"); }, }); @@ -160,7 +159,6 @@ try } CATCH - TEST_F(LocalIndexerSchedulerTest, TableFair) try { @@ -172,35 +170,35 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 3, - .page_ids = {}, + .page_ids = {1}, .request_memory = 0, .workload = [&]() { pushResult("ks1_t3_#1"); }, }); scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {}, + .page_ids = {2}, .request_memory = 0, .workload = [&]() { pushResult("ks1_t1_#1"); }, }); scheduler->pushTask({ .keyspace_id = 1, .table_id = 3, - .page_ids = {}, + .page_ids = {3}, .request_memory = 0, .workload = [&]() { pushResult("ks1_t3_#2"); }, }); scheduler->pushTask({ .keyspace_id = 1, .table_id = 2, - .page_ids = {}, + .page_ids = {4}, .request_memory = 0, .workload = [&]() { pushResult("ks1_t2_#1"); }, }); scheduler->pushTask({ .keyspace_id = 2, .table_id = 1, - .page_ids = {}, + .page_ids = {5}, .request_memory = 0, .workload = [&]() { pushResult("ks2_t1_#1"); }, }); @@ -218,7 +216,6 @@ try } CATCH - TEST_F(LocalIndexerSchedulerTest, TaskExceedMemoryLimit) try { @@ -233,7 +230,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {}, + .page_ids = {1}, .request_memory = 100, .workload = [&]() { pushResult("foo"); }, }); @@ -243,7 +240,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {}, + .page_ids = {2}, .request_memory = 0, .workload = [&]() { pushResult("bar"); }, }); @@ -266,7 +263,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {}, + .page_ids = {3}, .request_memory = 100, .workload = [&]() { pushResult("foo"); }, }); @@ -275,7 +272,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {}, + .page_ids = {4}, .request_memory = 0, .workload = [&]() { pushResult("bar"); }, }); @@ -290,7 +287,6 @@ try } CATCH - TEST_F(LocalIndexerSchedulerTest, MemoryLimit) try { @@ -311,7 +307,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {}, + .page_ids = {1}, .request_memory = 1, .workload = [=]() { @@ -322,7 +318,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {}, + .page_ids = {2}, .request_memory = 1, .workload = [=]() { @@ -333,7 +329,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {}, + .page_ids = {3}, .request_memory = 1, .workload = [=]() { @@ -361,7 +357,6 @@ try } CATCH - TEST_F(LocalIndexerSchedulerTest, ShutdownWithPendingTasks) try { @@ -376,7 +371,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {}, + .page_ids = {1}, .request_memory = 0, .workload = [=]() { @@ -388,7 +383,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {}, + .page_ids = {1}, .request_memory = 0, .workload = [=]() { @@ -414,7 +409,6 @@ try } CATCH - TEST_F(LocalIndexerSchedulerTest, WorkloadException) try { @@ -426,14 +420,14 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {}, + .page_ids = {1}, .request_memory = 0, .workload = [&]() { throw DB::Exception("foo"); }, }); scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {}, + .page_ids = {2}, .request_memory = 0, .workload = [&]() { pushResult("bar"); }, }); @@ -446,8 +440,7 @@ try } CATCH - -TEST_F(LocalIndexerSchedulerTest, DMFileIsUsing) +TEST_F(LocalIndexerSchedulerTest, PageIsUsing) try { auto scheduler = LocalIndexerScheduler::create({ @@ -507,11 +500,10 @@ try } CATCH - -TEST_F(LocalIndexerSchedulerTest, DMFileFromDifferentTable) +TEST_F(LocalIndexerSchedulerTest, DifferentTypePage) try { - // When DMFiles come from different table, should not block + // When pages are different type, should not block auto scheduler = LocalIndexerScheduler::create({ .pool_size = 4, @@ -524,6 +516,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, + .page_type = LocalIndexerScheduler::Task::PageType::DMFile, .page_ids = {1}, .request_memory = 0, .workload = [&]() { task_1_is_started->set_value(); }, @@ -531,46 +524,8 @@ try scheduler->pushTask({ .keyspace_id = 1, - .table_id = 2, - .page_ids = {1, 2}, - .request_memory = 0, - .workload = [&]() { task_2_is_started->set_value(); }, - }); - - scheduler->start(); - - task_1_is_started->get_future().wait(); - task_2_is_started->get_future().wait(); - - scheduler->waitForFinish(); -} -CATCH - - -TEST_F(LocalIndexerSchedulerTest, DMFileFromDifferentKeyspace) -try -{ - // When DMFiles come from different keyspace, should not block - - auto scheduler = LocalIndexerScheduler::create({ - .pool_size = 4, - .auto_start = false, - }); - - auto task_1_is_started = std::make_shared>(); - auto task_2_is_started = std::make_shared>(); - - scheduler->pushTask({ - .keyspace_id = 1, - .table_id = 1, - .page_ids = {1}, - .request_memory = 0, - .workload = [&]() { task_1_is_started->set_value(); }, - }); - - scheduler->pushTask({ - .keyspace_id = 2, .table_id = 1, + .page_type = LocalIndexerScheduler::Task::PageType::ColumnFileTiny, .page_ids = {1}, .request_memory = 0, .workload = [&]() { task_2_is_started->set_value(); }, From fbb0199651c781ea389856785c4a69d299333bfd Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Thu, 29 Aug 2024 11:14:17 +0800 Subject: [PATCH 3/7] rename Signed-off-by: Lloyd-Pottiger --- .../DeltaMerge/LocalIndexerScheduler.cpp | 54 ++++++++------- .../DeltaMerge/LocalIndexerScheduler.h | 66 +++++++++++------- .../tests/gtest_local_indexer_scheduler.cpp | 68 +++++++++---------- 3 files changed, 103 insertions(+), 85 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp index d981a3b3ea0..c29c4b1d945 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp @@ -19,6 +19,24 @@ namespace DB::DM { +bool operator==(const LocalIndexerScheduler::FileID & lhs, const LocalIndexerScheduler::FileID & rhs) +{ + if (lhs.index() != rhs.index()) + return false; + + auto index = lhs.index(); + if (index == 0) + { + return std::get(lhs).id == std::get(rhs).id; + } + else if (index == 1) + { + return std::get(lhs).id + == std::get(rhs).id; + } + return false; +} + LocalIndexerScheduler::LocalIndexerScheduler(const Options & options) : logger(Logger::get()) , pool(std::make_unique(options.pool_size, options.pool_size, options.pool_size + 1)) @@ -143,13 +161,9 @@ size_t LocalIndexerScheduler::dropTasks(KeyspaceID keyspace_id, TableID table_id bool LocalIndexerScheduler::isTaskReady(std::unique_lock &, const InternalTaskPtr & task) { - for (const auto & page_id : task->user_task.page_ids) + for (const auto & file_id : task->user_task.file_ids) { - auto unique_page_id = UniquePageID{ - .page_type = task->user_task.page_type, - .page_id = page_id, - }; - if (adding_index_page_id_set.find(unique_page_id) != adding_index_page_id_set.end()) + if (adding_index_page_id_set.find(file_id) != adding_index_page_id_set.end()) return false; } return true; @@ -157,24 +171,20 @@ bool LocalIndexerScheduler::isTaskReady(std::unique_lock &, const In void LocalIndexerScheduler::taskOnSchedule(std::unique_lock &, const InternalTaskPtr & task) { - for (const auto & page_id : task->user_task.page_ids) + for (const auto & file_id : task->user_task.file_ids) { - auto unique_page_id = UniquePageID{ - .page_type = task->user_task.page_type, - .page_id = page_id, - }; - auto [it, inserted] = adding_index_page_id_set.insert(unique_page_id); + auto [it, inserted] = adding_index_page_id_set.insert(file_id); RUNTIME_CHECK(inserted); UNUSED(it); } LOG_DEBUG( // logger, - "Start LocalIndex task, keyspace_id={} table_id={} page_ids={} " + "Start LocalIndex task, keyspace_id={} table_id={} file_ids={} " "memory_[this/total/limit]_mb={:.1f}/{:.1f}/{:.1f} all_tasks={}", task->user_task.keyspace_id, task->user_task.table_id, - task->user_task.page_ids, + task->user_task.file_ids, static_cast(task->user_task.request_memory) / 1024 / 1024, static_cast(pool_current_memory) / 1024 / 1024, static_cast(pool_max_memory_limit) / 1024 / 1024, @@ -186,13 +196,9 @@ void LocalIndexerScheduler::taskOnSchedule(std::unique_lock &, const void LocalIndexerScheduler::taskOnFinish(std::unique_lock & lock, const InternalTaskPtr & task) { - for (const auto & page_id : task->user_task.page_ids) + for (const auto & file_id : task->user_task.file_ids) { - auto unique_page_id = UniquePageID{ - .page_type = task->user_task.page_type, - .page_id = page_id, - }; - auto erased = adding_index_page_id_set.erase(unique_page_id); + auto erased = adding_index_page_id_set.erase(file_id); RUNTIME_CHECK(erased == 1, erased); } @@ -203,12 +209,12 @@ void LocalIndexerScheduler::taskOnFinish(std::unique_lock & lock, co LOG_DEBUG( // logger, - "Finish LocalIndex task, keyspace_id={} table_id={} page_ids={} " + "Finish LocalIndex task, keyspace_id={} table_id={} file_ids={} " "memory_[this/total/limit]_mb={:.1f}/{:.1f}/{:.1f} " "[schedule/task]_cost_sec={:.1f}/{:.1f}", task->user_task.keyspace_id, task->user_task.table_id, - task->user_task.page_ids, + task->user_task.file_ids, static_cast(task->user_task.request_memory) / 1024 / 1024, static_cast(pool_current_memory) / 1024 / 1024, static_cast(pool_max_memory_limit) / 1024 / 1024, @@ -333,10 +339,10 @@ LocalIndexerScheduler::ScheduleResult LocalIndexerScheduler::scheduleNextTask(st LOG_DEBUG( logger, "LocalIndex task is not ready, will try again later when it is ready. " - "keyspace_id={} table_id={} page_ids={}", + "keyspace_id={} table_id={} file_ids={}", task->user_task.keyspace_id, task->user_task.table_id, - task->user_task.page_ids); + task->user_task.file_ids); // Let the caller retry. At next retry, we will continue using this // Keyspace+Table and try next task. diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h index baa8506cede..0ef1cbeb7b8 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h @@ -30,20 +30,33 @@ namespace DB::DM class LocalIndexerScheduler { public: + // The file id of the DMFile. + struct DMFileID + { + explicit DMFileID(PageIdU64 id_) + : id(id_) + {} + PageIdU64 id; + }; + // The page id of the ColumnFileTiny. + struct ColumnFileTinyID + { + explicit ColumnFileTinyID(PageIdU64 id_) + : id(id_) + {} + PageIdU64 id; + }; + using FileID = std::variant; + struct Task { // Note: The scheduler will try to schedule farely according to keyspace_id and table_id. KeyspaceID keyspace_id; TableID table_id; - enum class PageType : UInt8 - { - DMFile = 0, - ColumnFileTiny = 1, - }; - PageType page_type; - // Used for the scheduler to avoid concurrently adding index for the same Page (ColumnFileTiny/DMFile). - std::vector page_ids; + // The file id of the ColumnFileTiny or DMFile. + // Used for the scheduler to avoid concurrently adding index for the same file. + std::vector file_ids; // Used for the scheduler to control the maximum requested memory usage. size_t request_memory; @@ -103,29 +116,16 @@ class LocalIndexerScheduler size_t dropTasks(KeyspaceID keyspace_id, TableID table_id); private: - struct UniquePageID - { - // We could use DMFile path as well, but this should be faster. - - Task::PageType page_type; - PageIdU64 page_id; - - bool operator==(const UniquePageID & other) const - { - return page_type == other.page_type && page_id == other.page_id; - } - }; - - struct UniquePageIDHasher + struct FileIDHasher { - std::size_t operator()(const UniquePageID & id) const + std::size_t operator()(const FileID & id) const { using boost::hash_combine; using boost::hash_value; std::size_t seed = 0; - hash_combine(seed, hash_value(id.page_type)); - hash_combine(seed, hash_value(id.page_id)); + hash_combine(seed, hash_value(id.index())); + hash_combine(seed, hash_value(std::visit([](const auto & id) { return id.id; }, id))); return seed; } }; @@ -134,7 +134,7 @@ class LocalIndexerScheduler // There maybe multiple threads trying to add index for the same Page. For example, // after logical split two segments share the same DMFile, so that adding index for the two segments // could result in adding the same index for the same DMFile. It's just a waste of resource. - std::unordered_set adding_index_page_id_set; + std::unordered_set adding_index_page_id_set; bool isTaskReady(std::unique_lock &, const InternalTaskPtr & task); @@ -206,4 +206,18 @@ class LocalIndexerScheduler std::atomic is_shutting_down = false; }; +bool operator==(const LocalIndexerScheduler::FileID & lhs, const LocalIndexerScheduler::FileID & rhs); + } // namespace DB::DM + +template <> +struct fmt::formatter +{ + static constexpr auto parse(format_parse_context & ctx) { return ctx.begin(); } + + template + auto format(const DB::DM::LocalIndexerScheduler::FileID & id, FormatContext & ctx) const -> decltype(ctx.out()) + { + return fmt::format_to(ctx.out(), "{}", std::visit([](const auto & id) { return id.id; }, id)); + } +}; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_local_indexer_scheduler.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_local_indexer_scheduler.cpp index 547790a66ac..dbd53d3b00a 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_local_indexer_scheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_local_indexer_scheduler.cpp @@ -47,7 +47,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {}, + .file_ids = {}, .request_memory = 0, .workload = [this]() { pushResult("foo"); }, }); @@ -66,7 +66,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {}, + .file_ids = {}, .request_memory = 0, .workload = [this]() { pushResult("bar"); }, }); @@ -90,42 +90,42 @@ try scheduler->pushTask({ .keyspace_id = 2, .table_id = 1, - .page_ids = {1}, + .file_ids = {LocalIndexerScheduler::DMFileID(1)}, .request_memory = 0, .workload = [&]() { pushResult("ks2_t1"); }, }); scheduler->pushTask({ .keyspace_id = 1, .table_id = 2, - .page_ids = {2}, + .file_ids = {LocalIndexerScheduler::DMFileID(2)}, .request_memory = 0, .workload = [&]() { pushResult("ks1_t2"); }, }); scheduler->pushTask({ .keyspace_id = 3, .table_id = 3, - .page_ids = {3}, + .file_ids = {LocalIndexerScheduler::DMFileID(3)}, .request_memory = 0, .workload = [&]() { pushResult("ks3_t3"); }, }); scheduler->pushTask({ .keyspace_id = 2, .table_id = 4, - .page_ids = {4}, + .file_ids = {LocalIndexerScheduler::DMFileID(4)}, .request_memory = 0, .workload = [&]() { pushResult("ks2_t4"); }, }); scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {5}, + .file_ids = {LocalIndexerScheduler::DMFileID(5)}, .request_memory = 0, .workload = [&]() { pushResult("ks1_t1"); }, }); scheduler->pushTask({ .keyspace_id = 1, .table_id = 3, - .page_ids = {6}, + .file_ids = {LocalIndexerScheduler::DMFileID(6)}, .request_memory = 0, .workload = [&]() { pushResult("ks1_t3"); }, }); @@ -147,7 +147,7 @@ try scheduler->pushTask({ .keyspace_id = 2, .table_id = 1, - .page_ids = {1}, + .file_ids = {LocalIndexerScheduler::DMFileID(1)}, .request_memory = 0, .workload = [&]() { pushResult("ks2_t1"); }, }); @@ -170,35 +170,35 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 3, - .page_ids = {1}, + .file_ids = {LocalIndexerScheduler::DMFileID(1)}, .request_memory = 0, .workload = [&]() { pushResult("ks1_t3_#1"); }, }); scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {2}, + .file_ids = {LocalIndexerScheduler::DMFileID(2)}, .request_memory = 0, .workload = [&]() { pushResult("ks1_t1_#1"); }, }); scheduler->pushTask({ .keyspace_id = 1, .table_id = 3, - .page_ids = {3}, + .file_ids = {LocalIndexerScheduler::DMFileID(3)}, .request_memory = 0, .workload = [&]() { pushResult("ks1_t3_#2"); }, }); scheduler->pushTask({ .keyspace_id = 1, .table_id = 2, - .page_ids = {4}, + .file_ids = {LocalIndexerScheduler::DMFileID(4)}, .request_memory = 0, .workload = [&]() { pushResult("ks1_t2_#1"); }, }); scheduler->pushTask({ .keyspace_id = 2, .table_id = 1, - .page_ids = {5}, + .file_ids = {LocalIndexerScheduler::DMFileID(5)}, .request_memory = 0, .workload = [&]() { pushResult("ks2_t1_#1"); }, }); @@ -230,7 +230,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {1}, + .file_ids = {LocalIndexerScheduler::DMFileID(1)}, .request_memory = 100, .workload = [&]() { pushResult("foo"); }, }); @@ -240,7 +240,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {2}, + .file_ids = {LocalIndexerScheduler::DMFileID(2)}, .request_memory = 0, .workload = [&]() { pushResult("bar"); }, }); @@ -263,7 +263,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {3}, + .file_ids = {LocalIndexerScheduler::DMFileID(3)}, .request_memory = 100, .workload = [&]() { pushResult("foo"); }, }); @@ -272,7 +272,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {4}, + .file_ids = {LocalIndexerScheduler::DMFileID(4)}, .request_memory = 0, .workload = [&]() { pushResult("bar"); }, }); @@ -307,7 +307,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {1}, + .file_ids = {LocalIndexerScheduler::DMFileID(1)}, .request_memory = 1, .workload = [=]() { @@ -318,7 +318,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {2}, + .file_ids = {LocalIndexerScheduler::DMFileID(2)}, .request_memory = 1, .workload = [=]() { @@ -329,7 +329,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {3}, + .file_ids = {LocalIndexerScheduler::DMFileID(3)}, .request_memory = 1, .workload = [=]() { @@ -371,7 +371,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {1}, + .file_ids = {LocalIndexerScheduler::DMFileID(1)}, .request_memory = 0, .workload = [=]() { @@ -383,7 +383,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {1}, + .file_ids = {LocalIndexerScheduler::DMFileID(1)}, .request_memory = 0, .workload = [=]() { @@ -420,14 +420,14 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {1}, + .file_ids = {LocalIndexerScheduler::DMFileID(1)}, .request_memory = 0, .workload = [&]() { throw DB::Exception("foo"); }, }); scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {2}, + .file_ids = {LocalIndexerScheduler::DMFileID(2)}, .request_memory = 0, .workload = [&]() { pushResult("bar"); }, }); @@ -440,7 +440,7 @@ try } CATCH -TEST_F(LocalIndexerSchedulerTest, PageIsUsing) +TEST_F(LocalIndexerSchedulerTest, FileIsUsing) try { auto scheduler = LocalIndexerScheduler::create({ @@ -457,7 +457,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {1}, + .file_ids = {LocalIndexerScheduler::DMFileID(1)}, .request_memory = 0, .workload = [&]() { @@ -469,7 +469,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {1, 2}, + .file_ids = {LocalIndexerScheduler::DMFileID(1), LocalIndexerScheduler::DMFileID(2)}, .request_memory = 0, .workload = [&]() { task_2_is_started->set_value(); }, }); @@ -477,7 +477,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_ids = {3}, + .file_ids = {LocalIndexerScheduler::DMFileID(3)}, .request_memory = 0, .workload = [&]() { task_3_is_started->set_value(); }, }); @@ -500,10 +500,10 @@ try } CATCH -TEST_F(LocalIndexerSchedulerTest, DifferentTypePage) +TEST_F(LocalIndexerSchedulerTest, DifferentTypeFile) try { - // When pages are different type, should not block + // When files are different type, should not block auto scheduler = LocalIndexerScheduler::create({ .pool_size = 4, @@ -516,8 +516,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_type = LocalIndexerScheduler::Task::PageType::DMFile, - .page_ids = {1}, + .file_ids = {LocalIndexerScheduler::DMFileID(1)}, .request_memory = 0, .workload = [&]() { task_1_is_started->set_value(); }, }); @@ -525,8 +524,7 @@ try scheduler->pushTask({ .keyspace_id = 1, .table_id = 1, - .page_type = LocalIndexerScheduler::Task::PageType::ColumnFileTiny, - .page_ids = {1}, + .file_ids = {LocalIndexerScheduler::ColumnFileTinyID(1)}, .request_memory = 0, .workload = [&]() { task_2_is_started->set_value(); }, }); From c44eeeacacf9dbd50b0632ea0fb77d8d71c69ca8 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Fri, 30 Aug 2024 11:37:04 +0800 Subject: [PATCH 4/7] address comments Signed-off-by: Lloyd-Pottiger --- .../Storages/DeltaMerge/LocalIndexerScheduler.cpp | 14 ++++++-------- .../Storages/DeltaMerge/LocalIndexerScheduler.h | 10 +++++----- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp index c29c4b1d945..6e2dff228bd 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp @@ -110,7 +110,7 @@ void LocalIndexerScheduler::pushTask(const Task & task) const auto internal_task = std::make_shared(InternalTask{ .user_task = task, .created_at = Stopwatch(), - .scheduled_at = Stopwatch{}, // Not scheduled + .scheduled_at = Stopwatch(), // Not scheduled }); // Whether task is ready is undertermined. It can be changed any time @@ -128,12 +128,10 @@ size_t LocalIndexerScheduler::dropTasks(KeyspaceID keyspace_id, TableID table_id size_t dropped_tasks = 0; std::unique_lock lock(mutex); - auto it = ready_tasks.find(keyspace_id); - if (it != ready_tasks.end()) + if (auto it = ready_tasks.find(keyspace_id); it != ready_tasks.end()) { auto & tasks_by_table = it->second; - auto table_it = tasks_by_table.find(table_id); - if (table_it != tasks_by_table.end()) + if (auto table_it = tasks_by_table.find(table_id); table_it != tasks_by_table.end()) { dropped_tasks += table_it->second.size(); tasks_by_table.erase(table_it); @@ -267,7 +265,6 @@ bool LocalIndexerScheduler::tryAddTaskToPool(std::unique_lock & lock } catch (...) { - // TODO: We should ensure ADD INDEX task is retried somewhere else? tryLogCurrentException( logger, fmt::format( @@ -294,7 +291,8 @@ LocalIndexerScheduler::ScheduleResult LocalIndexerScheduler::scheduleNextTask(st if (ready_tasks.empty()) return ScheduleResult::FAIL_NO_TASK; - // Find the keyspace ID which is just > last_schedule_keyspace_id. + // To be fairly between different keyspaces, + // find the keyspace ID which is just > last_schedule_keyspace_id. auto keyspace_it = ready_tasks.upper_bound(last_schedule_keyspace_id); if (keyspace_it == ready_tasks.end()) keyspace_it = ready_tasks.begin(); @@ -303,7 +301,7 @@ LocalIndexerScheduler::ScheduleResult LocalIndexerScheduler::scheduleNextTask(st auto & tasks_by_table = keyspace_it->second; RUNTIME_CHECK(!tasks_by_table.empty()); - TableID last_schedule_table_id = -1; + TableID last_schedule_table_id = InvalidTableID; if (last_schedule_table_id_by_ks.find(keyspace_id) != last_schedule_table_id_by_ks.end()) last_schedule_table_id = last_schedule_table_id_by_ks[keyspace_id]; diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h index 0ef1cbeb7b8..8e91b01f4ea 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h @@ -50,16 +50,16 @@ class LocalIndexerScheduler struct Task { - // Note: The scheduler will try to schedule farely according to keyspace_id and table_id. - KeyspaceID keyspace_id; - TableID table_id; + // Note: The scheduler will try to schedule fairly according to keyspace_id and table_id. + const KeyspaceID keyspace_id; + const TableID table_id; // The file id of the ColumnFileTiny or DMFile. // Used for the scheduler to avoid concurrently adding index for the same file. - std::vector file_ids; + const std::vector file_ids; // Used for the scheduler to control the maximum requested memory usage. - size_t request_memory; + const size_t request_memory; // The actual index setup workload. // The scheduler does not care about the workload. From 2a8f66ff100e546734f073d03ffb3a086a184d5e Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Fri, 30 Aug 2024 12:00:08 +0800 Subject: [PATCH 5/7] address comments Signed-off-by: Lloyd-Pottiger --- dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp | 3 ++- dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp index 6e2dff228bd..da7e3b642a6 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp @@ -305,7 +305,8 @@ LocalIndexerScheduler::ScheduleResult LocalIndexerScheduler::scheduleNextTask(st if (last_schedule_table_id_by_ks.find(keyspace_id) != last_schedule_table_id_by_ks.end()) last_schedule_table_id = last_schedule_table_id_by_ks[keyspace_id]; - auto table_it = tasks_by_table.upper_bound(last_schedule_table_id); + // Try to finish all tasks in the last table before moving to the next table. + auto table_it = tasks_by_table.find(last_schedule_table_id); if (table_it == tasks_by_table.end()) table_it = tasks_by_table.begin(); const TableID table_id = table_it->first; diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h index 8e91b01f4ea..b9aca8cf7d6 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h @@ -186,7 +186,7 @@ class LocalIndexerScheduler size_t pool_current_memory = 0; size_t all_tasks_count = 0; // ready_tasks + unready_tasks - /// Schedule farely according to keyspace_id, and then according to table_id. + /// Schedule fairly according to keyspace_id, and then according to table_id. std::map>> ready_tasks{}; /// When the scheduler will stop waiting and try to schedule again? /// 1. When a new task is added (and pool is not full) From e976f4fb49be1db1d97b8b5feec9f141da58a20b Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Tue, 3 Sep 2024 16:14:43 +0800 Subject: [PATCH 6/7] address comments Signed-off-by: Lloyd-Pottiger --- dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp | 9 ++++++--- dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h | 2 ++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp index da7e3b642a6..8153779a902 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp @@ -13,9 +13,11 @@ // limitations under the License. #include +#include #include #include + namespace DB::DM { @@ -306,7 +308,7 @@ LocalIndexerScheduler::ScheduleResult LocalIndexerScheduler::scheduleNextTask(st last_schedule_table_id = last_schedule_table_id_by_ks[keyspace_id]; // Try to finish all tasks in the last table before moving to the next table. - auto table_it = tasks_by_table.find(last_schedule_table_id); + auto table_it = tasks_by_table.lower_bound(last_schedule_table_id); if (table_it == tasks_by_table.end()) table_it = tasks_by_table.begin(); const TableID table_id = table_it->first; @@ -348,8 +350,7 @@ LocalIndexerScheduler::ScheduleResult LocalIndexerScheduler::scheduleNextTask(st return ScheduleResult::RETRY; } - auto ok = tryAddTaskToPool(lock, task); - if (!ok) + if (!tryAddTaskToPool(lock, task)) // The pool is full. May be memory limit reached or concurrent task limit reached. // We will not try any more tasks. // At next retry, we will continue using this Keyspace+Table and try next task. @@ -365,6 +366,8 @@ LocalIndexerScheduler::ScheduleResult LocalIndexerScheduler::scheduleNextTask(st void LocalIndexerScheduler::schedulerLoop() { + setThreadName("LocalIndexScheduler-SchedulerLoop"); + while (true) { if (is_shutting_down) diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h index b9aca8cf7d6..d72666377de 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h @@ -107,6 +107,8 @@ class LocalIndexerScheduler /** * @brief Push a task to the pool. The task may not be scheduled immediately. + * Support adding the same task multiple times, but they are not allowed to execute at the same time. + * If the request_memory of task is larger than the memory_limit, will throw an exception. */ void pushTask(const Task & task); From c217be8c07667db8716f5e1afe0288390bab25a2 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Tue, 3 Sep 2024 20:48:59 +0800 Subject: [PATCH 7/7] address comments Signed-off-by: Lloyd-Pottiger --- dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp index 8153779a902..5abb0d414f2 100644 --- a/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp @@ -366,7 +366,7 @@ LocalIndexerScheduler::ScheduleResult LocalIndexerScheduler::scheduleNextTask(st void LocalIndexerScheduler::schedulerLoop() { - setThreadName("LocalIndexScheduler-SchedulerLoop"); + setThreadName("LocalIndexSched"); while (true) {