Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix AsyncTasks cancel deadlock (#8953) #8960

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dbms/src/Interpreters/SharedContexts/Disagg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,13 @@ void SharedContextDisagg::initFastAddPeerContext(UInt64 fap_concur)
fap_context = std::make_shared<FastAddPeerContext>(fap_concur);
}


SharedContextDisagg::~SharedContextDisagg()
{
if (fap_context)
{
fap_context->shutdown();
}
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Interpreters/SharedContexts/Disagg.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ struct SharedContextDisagg : private boost::noncopyable
: global_context(global_context_)
{}

~SharedContextDisagg();

void initReadNodePageCache(const PathPool & path_pool, const String & cache_dir, size_t cache_capacity);

/// Note that the unit of max_size is quantity, not byte size. It controls how
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/KVStore/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ KVStore::~KVStore()
LOG_INFO(log, "Destroy KVStore");
stopThreadAllocInfo();
releaseReadIndexWorkers();
LOG_INFO(log, "Destroy KVStore Finished");
}

FileUsageStatistics KVStore::getFileUsageStatistics() const
Expand Down Expand Up @@ -611,6 +612,7 @@ void KVStore::stopThreadAllocInfo()
is_terminated = true;
monitoring_cv.notify_all();
}
LOG_INFO(log, "KVStore shutdown, wait thread alloc monitor join");
monitoring_thread->join();
delete monitoring_thread;
monitoring_thread = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ FastAddPeerContext::FastAddPeerContext(uint64_t thread_count)
tasks_trace = std::make_shared<FAPAsyncTasks>(thread_count, thread_count, 1000);
}

void FastAddPeerContext::shutdown() const
{
tasks_trace->shutdown();
}

ParsedCheckpointDataHolderPtr FastAddPeerContext::CheckpointCacheElement::getParsedCheckpointData(Context & context)
{
std::scoped_lock<std::mutex> lock(mu);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

namespace DB
{
class FastAddPeerContext;
using FAPAsyncTasks = AsyncTasks<uint64_t, std::function<FastAddPeerRes()>, FastAddPeerRes>;
struct CheckpointInfo;
using CheckpointInfoPtr = std::shared_ptr<CheckpointInfo>;
Expand All @@ -34,6 +35,7 @@ class FastAddPeerContext
{
public:
explicit FastAddPeerContext(uint64_t thread_count = 0);
void shutdown() const;

// Return parsed checkpoint data and its corresponding seq which is newer than `required_seq` if exists, otherwise return pair<required_seq, nullptr>
std::pair<UInt64, ParsedCheckpointDataHolderPtr> getNewerCheckpointData(
Expand Down
15 changes: 9 additions & 6 deletions dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,14 @@ RegionsReadIndexResult LearnerReadWorker::readIndex(
log_lvl,
"[Learner Read] Batch read index, num_regions={} num_requests={} num_stale_read={} num_cached_index={} "
"num_unavailable={} "
"cost={}ms",
"cost={}ms, read_tso={}",
stats.num_regions,
stats.num_read_index_request,
stats.num_stale_read,
stats.num_cached_read_index,
unavailable_regions.size(),
stats.read_index_elapsed_ms);
stats.read_index_elapsed_ms,
mvcc_query_info.read_tso);

return batch_read_index_result;
}
Expand Down Expand Up @@ -427,10 +428,11 @@ void LearnerReadWorker::waitIndex(
LOG_IMPL(
log,
log_lvl,
"[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}",
"[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}, read_tso={}",
stats.wait_index_elapsed_ms,
stats.num_regions,
unavailable_regions.size());
unavailable_regions.size(),
mvcc_query_info.read_tso);
}

std::tuple<Clock::time_point, Clock::time_point> //
Expand Down Expand Up @@ -469,13 +471,14 @@ LearnerReadWorker::waitUntilDataAvailable(
log,
log_lvl,
"[Learner Read] batch read index | wait index"
" total_cost={} read_cost={} wait_cost={} n_regions={} n_stale_read={} n_unavailable={}",
" total_cost={} read_cost={} wait_cost={} n_regions={} n_stale_read={} n_unavailable={}, read_tso={}",
time_elapsed_ms,
stats.read_index_elapsed_ms,
stats.wait_index_elapsed_ms,
stats.num_regions,
stats.num_stale_read,
unavailable_regions.size());
unavailable_regions.size(),
mvcc_query_info.read_tso);
return {start_time, end_time};
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/Read/ReadIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ void KVStore::stopReadIndexWorkers() const

void KVStore::releaseReadIndexWorkers()
{
LOG_INFO(log, "KVStore shutdown, deleting read index worker");
if (read_index_worker_manager)
{
delete read_index_worker_manager;
Expand Down
22 changes: 21 additions & 1 deletion dbms/src/Storages/KVStore/Utils/AsyncTasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,24 @@ struct AsyncTasks
, log(DB::Logger::get())
{}

~AsyncTasks() { LOG_INFO(log, "Pending {} tasks when destructing", count()); }
void shutdown()
{
LOG_INFO(log, "Pending {} tasks when destructing", count());
// To avoid the "last owner" problem in worker thread.
thread_pool->wait();
shut.store(true);
LOG_INFO(log, "Finish finalize thread pool");
}

~AsyncTasks()
{
if (!shut.load())
{
LOG_INFO(log, "Destruct without shutdown");
// Potential deadlock if the instance is held and released directly or indirectly by a task in its worker.
shutdown();
}
}

using TaskState = AsyncTaskHelper::TaskState;

Expand Down Expand Up @@ -241,6 +258,8 @@ struct AsyncTasks
// 1. There is already a task registered with the same name and not canceled or fetched.
bool addTaskWithCancel(Key k, Func f, CancelFunc cf)
{
if (shut.load())
return false;
std::scoped_lock l(mtx);
RUNTIME_CHECK(!tasks.contains(k));
using P = std::packaged_task<R()>;
Expand Down Expand Up @@ -399,5 +418,6 @@ struct AsyncTasks
std::unique_ptr<ThreadPool> thread_pool;
mutable std::mutex mtx;
LoggerPtr log;
std::atomic_bool shut = false;
};
} // namespace DB
37 changes: 26 additions & 11 deletions dbms/src/Storages/KVStore/tests/gtest_async_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,50 @@ TEST(AsyncTasksTest, AsyncTasksNormal)
auto log = DB::Logger::get();
LOG_INFO(log, "Cancel and addTask");
// Cancel and addTask
// 3 -> 1 -> 4 -> 2
{
auto async_tasks = std::make_unique<TestAsyncTasks>(1, 1, 2);
auto m = std::make_shared<std::mutex>();
int flag = 0;
std::unique_lock cl(*m);
std::atomic_bool finished_flag = false;
async_tasks->addTask(1, [m, &flag, &async_tasks, &finished_flag]() {
std::atomic_bool running_flag = false;
async_tasks->addTask(1, [m, &flag, &async_tasks, &finished_flag, &running_flag]() {
running_flag.store(true, std::memory_order_seq_cst);
auto cancel_handle = async_tasks->getCancelHandleFromExecutor(1);
std::scoped_lock rl(*m); // 2
SCOPE_EXIT({ finished_flag.store(true); });
std::scoped_lock rl(*m);
SCOPE_EXIT({ finished_flag.store(true, std::memory_order_seq_cst); });
// Run after `cl` is released.
if (cancel_handle->isCanceled())
{
return;
}
flag = 1;
});
ASSERT_TRUE(async_tasks->isScheduled(1));
{
int cnt_wait_sche = 0;
while (!running_flag.load(std::memory_order_seq_cst))
{
cnt_wait_sche += 1;
ASSERT(cnt_wait_sche < 6);
std::this_thread::sleep_for(200ms);
}
}
// Make sure we don't cancel in queue.
async_tasks->asyncCancelTask(1);
// The task is not registered anymore.
ASSERT_FALSE(async_tasks->isScheduled(1));
async_tasks->addTask(1, [&flag]() { flag = 2; });
cl.unlock(); // Now can task 1 run.
int count = 0;
using namespace std::chrono_literals;
while (!finished_flag.load())
cl.unlock();
{
count += 1;
ASSERT(count < 6);
std::this_thread::sleep_for(200ms);
int cnt_wait_finish = 0;
using namespace std::chrono_literals;
while (!finished_flag.load(std::memory_order_seq_cst))
{
cnt_wait_finish += 1;
ASSERT(cnt_wait_finish < 6);
std::this_thread::sleep_for(200ms);
}
}
ASSERT_NO_THROW(async_tasks->fetchResult(1));
ASSERT_EQ(flag, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class RegionKVStoreTestFAP : public KVStoreTestBase
{
auto & global_context = TiFlashTestEnv::getGlobalContext();
KVStoreTestBase::TearDown();
global_context.getSharedContextDisagg()->fap_context.reset();
global_context.getSharedContextDisagg()->fap_context->shutdown();
if (!already_initialize_data_store)
{
global_context.getSharedContextDisagg()->remote_data_store = nullptr;
Expand Down