diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp index 66f969127a0..360d732c1ac 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp @@ -39,16 +39,15 @@ SegmentReadTaskScheduler::~SegmentReadTaskScheduler() } void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool) +{ + // To avoid schedule from always failing to acquire the pending_mtx. + std::lock_guard lock(add_mtx); + submitPendingPool(pool); +} + +void SegmentReadTaskScheduler::addPool(const SegmentReadTaskPoolPtr & pool) { assert(pool != nullptr); - Stopwatch sw_add; - // `add_lock` is only used in this function to make all threads calling `add` to execute serially. - std::lock_guard add_lock(add_mtx); - add_waittings.fetch_add(1, std::memory_order_relaxed); - // `lock` is used to protect data. - std::lock_guard lock(mtx); - add_waittings.fetch_sub(1, std::memory_order_relaxed); - Stopwatch sw_do_add; read_pools.emplace(pool->pool_id, pool); const auto & tasks = pool->getTasks(); @@ -56,15 +55,43 @@ void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool) { merging_segments[seg_id].push_back(pool->pool_id); } +} + +void SegmentReadTaskScheduler::submitPendingPool(SegmentReadTaskPoolPtr pool) +{ + assert(pool != nullptr); + if (pool->getPendingSegmentCount() <= 0) + { + LOG_INFO(pool->getLogger(), "Ignored for no segment to read, pool_id={}", pool->pool_id); + return; + } + Stopwatch sw; + std::lock_guard lock(pending_mtx); + pending_pools.push_back(pool); LOG_INFO( pool->getLogger(), - "Added, pool_id={} block_slots={} segment_count={} pool_count={} cost={:.3f}us do_add_cost={:.3f}us", // + "Submitted, pool_id={} segment_count={} pending_pools={} cost={}ns", pool->pool_id, - pool->getFreeBlockSlots(), - tasks.size(), - read_pools.size(), - sw_add.elapsed() / 1000.0, - sw_do_add.elapsed() / 1000.0); + pool->getPendingSegmentCount(), + pending_pools.size(), + sw.elapsed()); +} + +void SegmentReadTaskScheduler::reapPendingPools() +{ + SegmentReadTaskPools pools; + { + std::lock_guard lock(pending_mtx); + pools.swap(pending_pools); + } + if (!pools.empty()) + { + for (const auto & pool : pools) + { + addPool(pool); + } + LOG_INFO(log, "Added, pool_ids={}, pool_count={}", pools, read_pools.size()); + } } MergedTaskPtr SegmentReadTaskScheduler::scheduleMergedTask(SegmentReadTaskPoolPtr & pool) @@ -243,49 +270,43 @@ std::tuple SegmentReadTaskScheduler::scheduleOneRound() bool SegmentReadTaskScheduler::schedule() { - Stopwatch sw_sched_total; - std::lock_guard lock(mtx); - Stopwatch sw_do_sched; - - auto pool_count = read_pools.size(); + Stopwatch sw_sched; UInt64 erased_pool_count = 0; UInt64 sched_null_count = 0; UInt64 sched_succ_count = 0; UInt64 sched_round = 0; bool can_sched_more_tasks = false; + UInt64 reap_pending_pools_ns = 0; do { ++sched_round; + Stopwatch sw; + reapPendingPools(); + reap_pending_pools_ns += sw.elapsed(); auto [erase, null, succ] = scheduleOneRound(); erased_pool_count += erase; sched_null_count += null; sched_succ_count += succ; can_sched_more_tasks = succ > 0 && !read_pools.empty(); - // If no thread is waitting to add tasks and there are some tasks to be scheduled, run scheduling again. - // Avoid releasing and acquiring `mtx` repeatly. - // This is common when query concurrency is low, but individual queries are heavy. - } while (add_waittings.load(std::memory_order_relaxed) <= 0 && can_sched_more_tasks); + } while (can_sched_more_tasks); if (read_pools.empty()) { GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_pool).Increment(); } - auto total_ms = sw_sched_total.elapsedMilliseconds(); - if (total_ms >= 100) + if (auto total_ms = sw_sched.elapsedMilliseconds(); total_ms >= 50) { LOG_INFO( log, - "schedule sched_round={} pool_count={} erased_pool_count={} sched_null_count={} sched_succ_count={} " - "cost={}ms do_sched_cost={}ms", + "schedule sched_round={} erased_pool_count={} sched_null_count={} sched_succ_count={} reap={}ms cost={}ms", sched_round, - pool_count, erased_pool_count, sched_null_count, sched_succ_count, - total_ms, - sw_do_sched.elapsedMilliseconds()); + reap_pending_pools_ns / 1000'000, + total_ms); } return can_sched_more_tasks; } diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h index 085b3d63dcc..621d1cec7c1 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h @@ -23,15 +23,18 @@ struct Settings; namespace DB::DM { - -// `SegmentReadTaskScheduler` is a global singleton. All `SegmentReadTaskPool` will be added to it and be scheduled by it. -// 1. `UnorderedInputStream`/`UnorderedSourceOps` will call `SegmentReadTaskScheduler::add` to add a `SegmentReadTaskPool` -// object to the `read_pools` list and index segments information into `merging_segments`. -// 2. A schedule-thread will scheduling read tasks: -// a. It scans the `read_pools` list and check if `SegmentReadTaskPool` need be scheduled. -// b. Chooses a `SegmentReadTask` of the `SegmentReadTaskPool`, if other `SegmentReadTaskPool` will read the same -// `SegmentReadTask`, pop them, and build a `MergedTask`. -// c. Sends the MergedTask to read threads(SegmentReader). +namespace tests +{ +class SegmentReadTasksPoolTest; +} +// SegmentReadTaskScheduler is a global singleton. All SegmentReadTaskPool objects will be added to it and be scheduled by it. +// - Threads of computational layer will call SegmentReadTaskScheduler::add to add a SegmentReadTaskPool object to the `pending_pools`. +// - Call path: UnorderedInputStream/UnorderedSourceOps -> SegmentReadTaskScheduler::add -> SegmentReadTaskScheduler::submitPendingPool +// +// - `sched_thread` will scheduling read tasks. +// - Call path: schedLoop -> schedule -> reapPendingPools -> scheduleOneRound +// - reapPeningPools will swap the `pending_pools` and add these pools to `read_pools` and `merging_segments`. +// - scheduleOneRound will scan `read_pools` and choose segments to read. class SegmentReadTaskScheduler { public: @@ -44,8 +47,8 @@ class SegmentReadTaskScheduler ~SegmentReadTaskScheduler(); DISALLOW_COPY_AND_MOVE(SegmentReadTaskScheduler); - // Add SegmentReadTaskPool to `read_pools` and index segments into merging_segments. - void add(const SegmentReadTaskPoolPtr & pool) LOCKS_EXCLUDED(add_mtx, mtx); + // Add `pool` to `pending_pools`. + void add(const SegmentReadTaskPoolPtr & pool); void pushMergedTask(const MergedTaskPtr & p) { merged_task_pool.push(p); } @@ -69,27 +72,31 @@ class SegmentReadTaskScheduler // `erased_pool_count` - how many stale pools have beed erased. // `sched_null_count` - how many pools do not require scheduling. // `sched_succ_count` - how many pools is scheduled. - std::tuple scheduleOneRound() EXCLUSIVE_LOCKS_REQUIRED(mtx); + std::tuple scheduleOneRound(); // `schedule()` calls `scheduleOneRound()` in a loop // until there are no tasks to schedule or need to release lock to other tasks. - bool schedule() LOCKS_EXCLUDED(mtx); + bool schedule(); // `schedLoop()` calls `schedule()` in infinite loop. - void schedLoop() LOCKS_EXCLUDED(mtx); + void schedLoop(); - MergedTaskPtr scheduleMergedTask(SegmentReadTaskPoolPtr & pool) EXCLUSIVE_LOCKS_REQUIRED(mtx); + MergedTaskPtr scheduleMergedTask(SegmentReadTaskPoolPtr & pool); // Returns . std::optional>> scheduleSegmentUnlock( - const SegmentReadTaskPoolPtr & pool) EXCLUSIVE_LOCKS_REQUIRED(mtx); - SegmentReadTaskPools getPoolsUnlock(const std::vector & pool_ids) EXCLUSIVE_LOCKS_REQUIRED(mtx); + const SegmentReadTaskPoolPtr & pool); + SegmentReadTaskPools getPoolsUnlock(const std::vector & pool_ids); + + void submitPendingPool(SegmentReadTaskPoolPtr pool); + void reapPendingPools(); + void addPool(const SegmentReadTaskPoolPtr & pool); // To restrict the instantaneous concurrency of `add` and avoid `schedule` from always failing to acquire the lock. - std::mutex add_mtx ACQUIRED_BEFORE(mtx); + std::mutex add_mtx; - std::mutex mtx; + // `read_pools` and `merging_segment` are only accessed by `sched_thread`. // pool_id -> pool - std::unordered_map read_pools GUARDED_BY(mtx); + std::unordered_map read_pools; // GlobalSegmentID -> pool_ids - MergingSegments merging_segments GUARDED_BY(mtx); + MergingSegments merging_segments; MergedTaskPool merged_task_pool; @@ -99,7 +106,9 @@ class SegmentReadTaskScheduler LoggerPtr log; - // To count how many threads are waitting to add tasks. - std::atomic add_waittings{0}; + std::mutex pending_mtx; + SegmentReadTaskPools pending_pools GUARDED_BY(pending_mtx); + + friend class tests::SegmentReadTasksPoolTest; }; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index dfcddc1eed4..b9557a99e6a 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -126,7 +126,7 @@ class SegmentReadTaskPool auto blk_avg_bytes = total_count > 0 ? total_bytes / total_count : 0; auto approximate_max_pending_block_bytes = blk_avg_bytes * max_queue_size; auto total_rows = blk_stat.totalRows(); - LOG_DEBUG( + LOG_INFO( log, "Done. pool_id={} pop={} pop_empty={} pop_empty_ratio={} " "max_queue_size={} blk_avg_bytes={} approximate_max_pending_block_bytes={:.2f}MB " @@ -255,3 +255,15 @@ using SegmentReadTaskPoolPtr = std::shared_ptr; using SegmentReadTaskPools = std::vector; } // namespace DB::DM + +template <> +struct fmt::formatter +{ + static constexpr auto parse(format_parse_context & ctx) { return ctx.begin(); } + + template + auto format(const DB::DM::SegmentReadTaskPoolPtr & pool, FormatContext & ctx) const + { + return fmt::format_to(ctx.out(), "{}", pool->pool_id); + } +}; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp index 9057c310c29..1e832dd1e11 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp @@ -107,95 +107,43 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic /*res_group_name_*/ String{}); } - inline static const std::vector test_seg_ids{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; -}; - -TEST_F(SegmentReadTasksPoolTest, UnorderedWrapper) -{ - SegmentReadTasksWrapper tasks_wrapper(true, createSegmentReadTasks(test_seg_ids)); - - bool exception_happened = false; - try - { - tasks_wrapper.nextTask(); - } - catch (const Exception & e) - { - exception_happened = true; - } - ASSERT_TRUE(exception_happened); - - ASSERT_FALSE(tasks_wrapper.empty()); - const auto & tasks = tasks_wrapper.getTasks(); - ASSERT_EQ(tasks.size(), test_seg_ids.size()); - - std::random_device rd; - std::mt19937 g(rd()); - std::vector v = test_seg_ids; - std::shuffle(v.begin(), v.end(), g); - for (PageIdU64 seg_id : v) - { - auto global_seg_id = createGlobalSegmentID(seg_id); - auto task = tasks_wrapper.getTask(global_seg_id); - ASSERT_NE(task, nullptr); - ASSERT_EQ(task->segment->segmentId(), seg_id); - task = tasks_wrapper.getTask(global_seg_id); - ASSERT_EQ(task, nullptr); - } - ASSERT_TRUE(tasks_wrapper.empty()); -} -TEST_F(SegmentReadTasksPoolTest, OrderedWrapper) -{ - SegmentReadTasksWrapper tasks_wrapper(false, createSegmentReadTasks(test_seg_ids)); - - bool exception_happened = false; - try - { - tasks_wrapper.getTasks(); - } - catch (const Exception & e) + void schedulerBasic() { - exception_happened = true; - } - ASSERT_TRUE(exception_happened); - - ASSERT_FALSE(tasks_wrapper.empty()); + SegmentReadTaskScheduler scheduler{false}; - for (PageIdU64 seg_id : test_seg_ids) - { - auto task = tasks_wrapper.nextTask(); - ASSERT_EQ(task->segment->segmentId(), seg_id); - } - ASSERT_TRUE(tasks_wrapper.empty()); - ASSERT_EQ(tasks_wrapper.nextTask(), nullptr); -} - -TEST_F(SegmentReadTasksPoolTest, SchedulerBasic) -{ - SegmentReadTaskScheduler scheduler{false}; - - { - // Create and add pool. + // Create auto pool = createSegmentReadTaskPool(test_seg_ids); pool->increaseUnorderedInputStreamRefCount(); + ASSERT_EQ(pool->getPendingSegmentCount(), test_seg_ids.size()); + + // Submit to pending_pools scheduler.add(pool); + { + std::lock_guard lock(scheduler.pending_mtx); // Disable TSA warnnings + ASSERT_EQ(scheduler.pending_pools.size(), 1); + } + ASSERT_EQ(scheduler.read_pools.size(), 0); + + // Reap the pending_pools + scheduler.reapPendingPools(); + { + std::lock_guard lock(scheduler.pending_mtx); // Disable TSA warnnings + ASSERT_EQ(scheduler.pending_pools.size(), 0); + } + ASSERT_EQ(scheduler.read_pools.size(), 1); - // Schedule segment to reach limitation. + // Schedule segment to reach limitation auto active_segment_limits = pool->getFreeActiveSegments(); ASSERT_GT(active_segment_limits, 0); std::vector merged_tasks; for (int i = 0; i < active_segment_limits; ++i) { - std::lock_guard lock(scheduler.mtx); auto merged_task = scheduler.scheduleMergedTask(pool); ASSERT_NE(merged_task, nullptr); merged_tasks.push_back(merged_task); } - { - std::lock_guard lock(scheduler.mtx); - ASSERT_EQ(scheduler.scheduleMergedTask(pool), nullptr); - } + ASSERT_EQ(scheduler.scheduleMergedTask(pool), nullptr); // Make a segment finished. { @@ -237,7 +185,6 @@ TEST_F(SegmentReadTasksPoolTest, SchedulerBasic) for (;;) { - std::lock_guard lock(scheduler.mtx); auto merged_task = scheduler.scheduleMergedTask(pool); if (merged_task == nullptr) { @@ -254,6 +201,76 @@ TEST_F(SegmentReadTasksPoolTest, SchedulerBasic) ASSERT_FALSE(pool->valid()); } } + + inline static const std::vector test_seg_ids{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; +}; + +TEST_F(SegmentReadTasksPoolTest, UnorderedWrapper) +{ + SegmentReadTasksWrapper tasks_wrapper(true, createSegmentReadTasks(test_seg_ids)); + + bool exception_happened = false; + try + { + tasks_wrapper.nextTask(); + } + catch (const Exception & e) + { + exception_happened = true; + } + ASSERT_TRUE(exception_happened); + + ASSERT_FALSE(tasks_wrapper.empty()); + const auto & tasks = tasks_wrapper.getTasks(); + ASSERT_EQ(tasks.size(), test_seg_ids.size()); + + std::random_device rd; + std::mt19937 g(rd()); + std::vector v = test_seg_ids; + std::shuffle(v.begin(), v.end(), g); + for (PageIdU64 seg_id : v) + { + auto global_seg_id = createGlobalSegmentID(seg_id); + auto task = tasks_wrapper.getTask(global_seg_id); + ASSERT_NE(task, nullptr); + ASSERT_EQ(task->segment->segmentId(), seg_id); + task = tasks_wrapper.getTask(global_seg_id); + ASSERT_EQ(task, nullptr); + } + ASSERT_TRUE(tasks_wrapper.empty()); +} + +TEST_F(SegmentReadTasksPoolTest, OrderedWrapper) +{ + SegmentReadTasksWrapper tasks_wrapper(false, createSegmentReadTasks(test_seg_ids)); + + bool exception_happened = false; + try + { + tasks_wrapper.getTasks(); + } + catch (const Exception & e) + { + exception_happened = true; + } + ASSERT_TRUE(exception_happened); + + ASSERT_FALSE(tasks_wrapper.empty()); + + for (PageIdU64 seg_id : test_seg_ids) + { + auto task = tasks_wrapper.nextTask(); + ASSERT_EQ(task->segment->segmentId(), seg_id); + } + ASSERT_TRUE(tasks_wrapper.empty()); + ASSERT_EQ(tasks_wrapper.nextTask(), nullptr); +} + +TEST_F(SegmentReadTasksPoolTest, SchedulerBasic) +try +{ + schedulerBasic(); } +CATCH -} // namespace DB::DM::tests +} // namespace DB::DM::tests \ No newline at end of file