Skip to content

Commit

Permalink
Storages: Refine SegmentReadTaskScheduler::add to reduce lock content…
Browse files Browse the repository at this point in the history
…ion (#9027) (#9111)

close #9024

Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: JaySon-Huang <[email protected]>

Co-authored-by: jinhelin <[email protected]>
Co-authored-by: JaySon-Huang <[email protected]>
  • Loading branch information
3 people authored Jun 4, 2024
1 parent 8e8e369 commit 4951e18
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,32 +39,59 @@ SegmentReadTaskScheduler::~SegmentReadTaskScheduler()
}

void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool)
{
// To avoid schedule from always failing to acquire the pending_mtx.
std::lock_guard lock(add_mtx);
submitPendingPool(pool);
}

void SegmentReadTaskScheduler::addPool(const SegmentReadTaskPoolPtr & pool)
{
assert(pool != nullptr);
Stopwatch sw_add;
// `add_lock` is only used in this function to make all threads calling `add` to execute serially.
std::lock_guard add_lock(add_mtx);
add_waittings.fetch_add(1, std::memory_order_relaxed);
// `lock` is used to protect data.
std::lock_guard lock(mtx);
add_waittings.fetch_sub(1, std::memory_order_relaxed);
Stopwatch sw_do_add;
read_pools.emplace(pool->pool_id, pool);

const auto & tasks = pool->getTasks();
for (const auto & [seg_id, task] : tasks)
{
merging_segments[seg_id].push_back(pool->pool_id);
}
}

void SegmentReadTaskScheduler::submitPendingPool(SegmentReadTaskPoolPtr pool)
{
assert(pool != nullptr);
if (pool->getPendingSegmentCount() <= 0)
{
LOG_INFO(pool->getLogger(), "Ignored for no segment to read, pool_id={}", pool->pool_id);
return;
}
Stopwatch sw;
std::lock_guard lock(pending_mtx);
pending_pools.push_back(pool);
LOG_INFO(
pool->getLogger(),
"Added, pool_id={} block_slots={} segment_count={} pool_count={} cost={:.3f}us do_add_cost={:.3f}us", //
"Submitted, pool_id={} segment_count={} pending_pools={} cost={}ns",
pool->pool_id,
pool->getFreeBlockSlots(),
tasks.size(),
read_pools.size(),
sw_add.elapsed() / 1000.0,
sw_do_add.elapsed() / 1000.0);
pool->getPendingSegmentCount(),
pending_pools.size(),
sw.elapsed());
}

void SegmentReadTaskScheduler::reapPendingPools()
{
SegmentReadTaskPools pools;
{
std::lock_guard lock(pending_mtx);
pools.swap(pending_pools);
}
if (!pools.empty())
{
for (const auto & pool : pools)
{
addPool(pool);
}
LOG_INFO(log, "Added, pool_ids={}, pool_count={}", pools, read_pools.size());
}
}

MergedTaskPtr SegmentReadTaskScheduler::scheduleMergedTask(SegmentReadTaskPoolPtr & pool)
Expand Down Expand Up @@ -243,49 +270,43 @@ std::tuple<UInt64, UInt64, UInt64> SegmentReadTaskScheduler::scheduleOneRound()

bool SegmentReadTaskScheduler::schedule()
{
Stopwatch sw_sched_total;
std::lock_guard lock(mtx);
Stopwatch sw_do_sched;

auto pool_count = read_pools.size();
Stopwatch sw_sched;
UInt64 erased_pool_count = 0;
UInt64 sched_null_count = 0;
UInt64 sched_succ_count = 0;
UInt64 sched_round = 0;
bool can_sched_more_tasks = false;
UInt64 reap_pending_pools_ns = 0;
do
{
++sched_round;
Stopwatch sw;
reapPendingPools();
reap_pending_pools_ns += sw.elapsed();
auto [erase, null, succ] = scheduleOneRound();
erased_pool_count += erase;
sched_null_count += null;
sched_succ_count += succ;

can_sched_more_tasks = succ > 0 && !read_pools.empty();
// If no thread is waitting to add tasks and there are some tasks to be scheduled, run scheduling again.
// Avoid releasing and acquiring `mtx` repeatly.
// This is common when query concurrency is low, but individual queries are heavy.
} while (add_waittings.load(std::memory_order_relaxed) <= 0 && can_sched_more_tasks);
} while (can_sched_more_tasks);

if (read_pools.empty())
{
GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_pool).Increment();
}

auto total_ms = sw_sched_total.elapsedMilliseconds();
if (total_ms >= 100)
if (auto total_ms = sw_sched.elapsedMilliseconds(); total_ms >= 50)
{
LOG_INFO(
log,
"schedule sched_round={} pool_count={} erased_pool_count={} sched_null_count={} sched_succ_count={} "
"cost={}ms do_sched_cost={}ms",
"schedule sched_round={} erased_pool_count={} sched_null_count={} sched_succ_count={} reap={}ms cost={}ms",
sched_round,
pool_count,
erased_pool_count,
sched_null_count,
sched_succ_count,
total_ms,
sw_do_sched.elapsedMilliseconds());
reap_pending_pools_ns / 1000'000,
total_ms);
}
return can_sched_more_tasks;
}
Expand Down
55 changes: 32 additions & 23 deletions dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@ struct Settings;

namespace DB::DM
{

// `SegmentReadTaskScheduler` is a global singleton. All `SegmentReadTaskPool` will be added to it and be scheduled by it.
// 1. `UnorderedInputStream`/`UnorderedSourceOps` will call `SegmentReadTaskScheduler::add` to add a `SegmentReadTaskPool`
// object to the `read_pools` list and index segments information into `merging_segments`.
// 2. A schedule-thread will scheduling read tasks:
// a. It scans the `read_pools` list and check if `SegmentReadTaskPool` need be scheduled.
// b. Chooses a `SegmentReadTask` of the `SegmentReadTaskPool`, if other `SegmentReadTaskPool` will read the same
// `SegmentReadTask`, pop them, and build a `MergedTask`.
// c. Sends the MergedTask to read threads(SegmentReader).
namespace tests
{
class SegmentReadTasksPoolTest;
}
// SegmentReadTaskScheduler is a global singleton. All SegmentReadTaskPool objects will be added to it and be scheduled by it.
// - Threads of computational layer will call SegmentReadTaskScheduler::add to add a SegmentReadTaskPool object to the `pending_pools`.
// - Call path: UnorderedInputStream/UnorderedSourceOps -> SegmentReadTaskScheduler::add -> SegmentReadTaskScheduler::submitPendingPool
//
// - `sched_thread` will scheduling read tasks.
// - Call path: schedLoop -> schedule -> reapPendingPools -> scheduleOneRound
// - reapPeningPools will swap the `pending_pools` and add these pools to `read_pools` and `merging_segments`.
// - scheduleOneRound will scan `read_pools` and choose segments to read.
class SegmentReadTaskScheduler
{
public:
Expand All @@ -44,8 +47,8 @@ class SegmentReadTaskScheduler
~SegmentReadTaskScheduler();
DISALLOW_COPY_AND_MOVE(SegmentReadTaskScheduler);

// Add SegmentReadTaskPool to `read_pools` and index segments into merging_segments.
void add(const SegmentReadTaskPoolPtr & pool) LOCKS_EXCLUDED(add_mtx, mtx);
// Add `pool` to `pending_pools`.
void add(const SegmentReadTaskPoolPtr & pool);

void pushMergedTask(const MergedTaskPtr & p) { merged_task_pool.push(p); }

Expand All @@ -69,27 +72,31 @@ class SegmentReadTaskScheduler
// `erased_pool_count` - how many stale pools have beed erased.
// `sched_null_count` - how many pools do not require scheduling.
// `sched_succ_count` - how many pools is scheduled.
std::tuple<UInt64, UInt64, UInt64> scheduleOneRound() EXCLUSIVE_LOCKS_REQUIRED(mtx);
std::tuple<UInt64, UInt64, UInt64> scheduleOneRound();
// `schedule()` calls `scheduleOneRound()` in a loop
// until there are no tasks to schedule or need to release lock to other tasks.
bool schedule() LOCKS_EXCLUDED(mtx);
bool schedule();
// `schedLoop()` calls `schedule()` in infinite loop.
void schedLoop() LOCKS_EXCLUDED(mtx);
void schedLoop();

MergedTaskPtr scheduleMergedTask(SegmentReadTaskPoolPtr & pool) EXCLUSIVE_LOCKS_REQUIRED(mtx);
MergedTaskPtr scheduleMergedTask(SegmentReadTaskPoolPtr & pool);
// Returns <seg_id, pool_ids>.
std::optional<std::pair<GlobalSegmentID, std::vector<UInt64>>> scheduleSegmentUnlock(
const SegmentReadTaskPoolPtr & pool) EXCLUSIVE_LOCKS_REQUIRED(mtx);
SegmentReadTaskPools getPoolsUnlock(const std::vector<uint64_t> & pool_ids) EXCLUSIVE_LOCKS_REQUIRED(mtx);
const SegmentReadTaskPoolPtr & pool);
SegmentReadTaskPools getPoolsUnlock(const std::vector<uint64_t> & pool_ids);

void submitPendingPool(SegmentReadTaskPoolPtr pool);
void reapPendingPools();
void addPool(const SegmentReadTaskPoolPtr & pool);

// To restrict the instantaneous concurrency of `add` and avoid `schedule` from always failing to acquire the lock.
std::mutex add_mtx ACQUIRED_BEFORE(mtx);
std::mutex add_mtx;

std::mutex mtx;
// `read_pools` and `merging_segment` are only accessed by `sched_thread`.
// pool_id -> pool
std::unordered_map<UInt64, SegmentReadTaskPoolPtr> read_pools GUARDED_BY(mtx);
std::unordered_map<UInt64, SegmentReadTaskPoolPtr> read_pools;
// GlobalSegmentID -> pool_ids
MergingSegments merging_segments GUARDED_BY(mtx);
MergingSegments merging_segments;

MergedTaskPool merged_task_pool;

Expand All @@ -99,7 +106,9 @@ class SegmentReadTaskScheduler

LoggerPtr log;

// To count how many threads are waitting to add tasks.
std::atomic<Int64> add_waittings{0};
std::mutex pending_mtx;
SegmentReadTaskPools pending_pools GUARDED_BY(pending_mtx);

friend class tests::SegmentReadTasksPoolTest;
};
} // namespace DB::DM
14 changes: 13 additions & 1 deletion dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class SegmentReadTaskPool
auto blk_avg_bytes = total_count > 0 ? total_bytes / total_count : 0;
auto approximate_max_pending_block_bytes = blk_avg_bytes * max_queue_size;
auto total_rows = blk_stat.totalRows();
LOG_DEBUG(
LOG_INFO(
log,
"Done. pool_id={} pop={} pop_empty={} pop_empty_ratio={} "
"max_queue_size={} blk_avg_bytes={} approximate_max_pending_block_bytes={:.2f}MB "
Expand Down Expand Up @@ -255,3 +255,15 @@ using SegmentReadTaskPoolPtr = std::shared_ptr<SegmentReadTaskPool>;
using SegmentReadTaskPools = std::vector<SegmentReadTaskPoolPtr>;

} // namespace DB::DM

template <>
struct fmt::formatter<DB::DM::SegmentReadTaskPoolPtr>
{
static constexpr auto parse(format_parse_context & ctx) { return ctx.begin(); }

template <typename FormatContext>
auto format(const DB::DM::SegmentReadTaskPoolPtr & pool, FormatContext & ctx) const
{
return fmt::format_to(ctx.out(), "{}", pool->pool_id);
}
};
Loading

0 comments on commit 4951e18

Please sign in to comment.