Skip to content

Commit

Permalink
Fix AsyncTasks cancel deadlock (#8953)
Browse files Browse the repository at this point in the history
close #8952
  • Loading branch information
CalvinNeo authored Apr 17, 2024
1 parent 8a09075 commit 14a1278
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 10 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"Raft read index events counter", \
Counter, \
F(type_bypass_lock, {{"type", "bypass_lock"}}), \
F(type_zero_read_tso, {{"type", "zero_read_tso"}}), \
F(type_use_histroy, {{"type", "use_histroy"}}), \
F(type_use_cache, {{"type", "use_cache"}})) \
M(tiflash_raft_learner_read_failures_count, \
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Interpreters/SharedContexts/Disagg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,13 @@ void SharedContextDisagg::initFastAddPeerContext(UInt64 fap_concur)
fap_context = std::make_shared<FastAddPeerContext>(fap_concur);
}


SharedContextDisagg::~SharedContextDisagg()
{
if (fap_context)
{
fap_context->shutdown();
}
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Interpreters/SharedContexts/Disagg.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ struct SharedContextDisagg : private boost::noncopyable
: global_context(global_context_)
{}

~SharedContextDisagg();

void initReadNodePageCache(const PathPool & path_pool, const String & cache_dir, size_t cache_capacity);

/// Note that the unit of max_size is quantity, not byte size. It controls how
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/KVStore/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ KVStore::~KVStore()
LOG_INFO(log, "Destroy KVStore");
stopThreadAllocInfo();
releaseReadIndexWorkers();
LOG_INFO(log, "Destroy KVStore Finished");
}

FileUsageStatistics KVStore::getFileUsageStatistics() const
Expand Down Expand Up @@ -611,6 +612,7 @@ void KVStore::stopThreadAllocInfo()
is_terminated = true;
monitoring_cv.notify_all();
}
LOG_INFO(log, "KVStore shutdown, wait thread alloc monitor join");
monitoring_thread->join();
delete monitoring_thread;
monitoring_thread = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ FastAddPeerContext::FastAddPeerContext(uint64_t thread_count)
tasks_trace = std::make_shared<FAPAsyncTasks>(thread_count, thread_count, 1000);
}

void FastAddPeerContext::shutdown() const
{
tasks_trace->shutdown();
}

ParsedCheckpointDataHolderPtr FastAddPeerContext::CheckpointCacheElement::getParsedCheckpointData(Context & context)
{
std::scoped_lock<std::mutex> lock(mu);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

namespace DB
{
class FastAddPeerContext;
using FAPAsyncTasks = AsyncTasks<uint64_t, std::function<FastAddPeerRes()>, FastAddPeerRes>;
struct CheckpointInfo;
using CheckpointInfoPtr = std::shared_ptr<CheckpointInfo>;
Expand All @@ -34,6 +35,7 @@ class FastAddPeerContext
{
public:
explicit FastAddPeerContext(uint64_t thread_count = 0);
void shutdown() const;

// Return parsed checkpoint data and its corresponding seq which is newer than `required_seq` if exists, otherwise return pair<required_seq, nullptr>
std::pair<UInt64, ParsedCheckpointDataHolderPtr> getNewerCheckpointData(
Expand Down
24 changes: 16 additions & 8 deletions dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ std::vector<kvrpcpb::ReadIndexRequest> LearnerReadWorker::buildBatchReadIndexReq
// If using `std::numeric_limits<uint64_t>::max()`, set `start-ts` 0 to get the latest index but let read-index-worker do not record as history.
auto read_index_tso
= mvcc_query_info.read_tso == std::numeric_limits<uint64_t>::max() ? 0 : mvcc_query_info.read_tso;
if (read_index_tso == 0)
{
GET_METRIC(tiflash_raft_read_index_events_count, type_zero_read_tso).Increment();
}
for (const auto & region_to_query : regions_info)
{
const RegionID region_id = region_to_query.region_id;
Expand Down Expand Up @@ -325,13 +329,14 @@ RegionsReadIndexResult LearnerReadWorker::readIndex(
log_lvl,
"[Learner Read] Batch read index, num_regions={} num_requests={} num_stale_read={} num_cached_index={} "
"num_unavailable={} "
"cost={}ms",
"cost={}ms, read_tso={}",
stats.num_regions,
stats.num_read_index_request,
stats.num_stale_read,
stats.num_cached_read_index,
unavailable_regions.size(),
stats.read_index_elapsed_ms);
stats.read_index_elapsed_ms,
mvcc_query_info.read_tso);

return batch_read_index_result;
}
Expand Down Expand Up @@ -427,10 +432,11 @@ void LearnerReadWorker::waitIndex(
LOG_IMPL(
log,
log_lvl,
"[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}",
"[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}, read_tso={}",
stats.wait_index_elapsed_ms,
stats.num_regions,
unavailable_regions.size());
unavailable_regions.size(),
mvcc_query_info.read_tso);

auto bypass_formatter = [&](const RegionQueryInfo & query_info) -> String {
if (query_info.bypass_lock_ts == nullptr)
Expand Down Expand Up @@ -464,9 +470,10 @@ void LearnerReadWorker::waitIndex(

LOG_DEBUG(
log,
"[Learner Read] Learner Read Summary, regions_info={}, unavailable_regions_info={}",
"[Learner Read] Learner Read Summary, regions_info={}, unavailable_regions_info={}, read_tso={}",
region_info_formatter(),
unavailable_regions.toDebugString());
unavailable_regions.toDebugString(),
mvcc_query_info.read_tso);
}

std::tuple<Clock::time_point, Clock::time_point> //
Expand Down Expand Up @@ -505,13 +512,14 @@ LearnerReadWorker::waitUntilDataAvailable(
log,
log_lvl,
"[Learner Read] batch read index | wait index"
" total_cost={} read_cost={} wait_cost={} n_regions={} n_stale_read={} n_unavailable={}",
" total_cost={} read_cost={} wait_cost={} n_regions={} n_stale_read={} n_unavailable={}, read_tso={}",
time_elapsed_ms,
stats.read_index_elapsed_ms,
stats.wait_index_elapsed_ms,
stats.num_regions,
stats.num_stale_read,
unavailable_regions.size());
unavailable_regions.size(),
mvcc_query_info.read_tso);
return {start_time, end_time};
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/Read/ReadIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ void KVStore::stopReadIndexWorkers() const

void KVStore::releaseReadIndexWorkers()
{
LOG_INFO(log, "KVStore shutdown, deleting read index worker");
if (read_index_worker_manager)
{
delete read_index_worker_manager;
Expand Down
22 changes: 21 additions & 1 deletion dbms/src/Storages/KVStore/Utils/AsyncTasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,24 @@ struct AsyncTasks
, log(DB::Logger::get())
{}

~AsyncTasks() { LOG_INFO(log, "Pending {} tasks when destructing", count()); }
void shutdown()
{
LOG_INFO(log, "Pending {} tasks when destructing", count());
// To avoid the "last owner" problem in worker thread.
thread_pool->wait();
shut.store(true);
LOG_INFO(log, "Finish finalize thread pool");
}

~AsyncTasks()
{
if (!shut.load())
{
LOG_INFO(log, "Destruct without shutdown");
// Potential deadlock if the instance is held and released directly or indirectly by a task in its worker.
shutdown();
}
}

using TaskState = AsyncTaskHelper::TaskState;

Expand Down Expand Up @@ -241,6 +258,8 @@ struct AsyncTasks
// 1. There is already a task registered with the same name and not canceled or fetched.
bool addTaskWithCancel(Key k, Func f, CancelFunc cf)
{
if (shut.load())
return false;
std::scoped_lock l(mtx);
RUNTIME_CHECK(!tasks.contains(k));
using P = std::packaged_task<R()>;
Expand Down Expand Up @@ -399,5 +418,6 @@ struct AsyncTasks
std::unique_ptr<ThreadPool> thread_pool;
mutable std::mutex mtx;
LoggerPtr log;
std::atomic_bool shut = false;
};
} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class RegionKVStoreTestFAP : public KVStoreTestBase
{
auto & global_context = TiFlashTestEnv::getGlobalContext();
KVStoreTestBase::TearDown();
global_context.getSharedContextDisagg()->fap_context.reset();
global_context.getSharedContextDisagg()->fap_context->shutdown();
if (!already_initialize_data_store)
{
global_context.getSharedContextDisagg()->remote_data_store = nullptr;
Expand Down

0 comments on commit 14a1278

Please sign in to comment.