Skip to content

Commit

Permalink
Refine cancelMPPQuery (#5361)
Browse files Browse the repository at this point in the history
ref #5095
  • Loading branch information
windtalker authored Jul 14, 2022
1 parent f65240e commit f97567a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 32 deletions.
14 changes: 9 additions & 5 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ MPPTask::~MPPTask()
{
/// the threads of this task are not fully freed now, since the BlockIO and DAGContext are not destructed
/// TODO: finish all threads before here, except the current one.
manager->releaseThreadsFromScheduler(needed_threads);
manager.load()->releaseThreadsFromScheduler(needed_threads);
schedule_state = ScheduleState::COMPLETED;
}
LOG_FMT_DEBUG(log, "finish MPPTask: {}", id.toString());
Expand Down Expand Up @@ -212,10 +212,11 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn

void MPPTask::unregisterTask()
{
if (manager != nullptr)
auto * manager_ptr = manager.load();
if (manager_ptr != nullptr)
{
LOG_DEBUG(log, "task unregistered");
manager->unregisterTask(this);
manager_ptr->unregisterTask(this);
}
else
{
Expand Down Expand Up @@ -435,7 +436,10 @@ void MPPTask::runImpl()

void MPPTask::handleError(const String & error_msg)
{
if (manager == nullptr || !manager->isTaskToBeCancelled(id))
auto * manager_ptr = manager.load();
/// if manager_ptr is not nullptr, it means the task has already been registered,
/// MPPTaskManager::cancelMPPQuery will handle it properly if the query is to be cancelled.
if (manager_ptr == nullptr || !manager_ptr->isQueryToBeCancelled(id.start_ts))
abort(error_msg, AbortType::ONERROR);
}

Expand Down Expand Up @@ -503,7 +507,7 @@ bool MPPTask::switchStatus(TaskStatus from, TaskStatus to)

void MPPTask::scheduleOrWait()
{
if (!manager->tryToScheduleTask(shared_from_this()))
if (!manager.load()->tryToScheduleTask(shared_from_this()))
{
LOG_FMT_INFO(log, "task waits for schedule");
Stopwatch stopwatch;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

int new_thread_count_of_exchange_receiver = 0;

MPPTaskManager * manager = nullptr;
std::atomic<MPPTaskManager *> manager = nullptr;

const LoggerPtr log;

Expand Down
44 changes: 20 additions & 24 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,16 @@ void MPPTaskManager::cancelMPPQuery(UInt64 query_id, const String & reason)
/// one without holding the lock
std::lock_guard lock(mu);
auto it = mpp_query_map.find(query_id);
if (it == mpp_query_map.end() || it->second->to_be_cancelled)
if (it == mpp_query_map.end())
{
LOG_WARNING(log, fmt::format("{} does not found in task manager, skip cancel", query_id));
return;
}
else if (it->second->to_be_cancelled)
{
LOG_WARNING(log, fmt::format("{} already in cancel process, skip cancel", query_id));
return;
}
it->second->to_be_cancelled = true;
task_set = it->second;
scheduler->deleteQuery(query_id, *this, true);
Expand All @@ -90,30 +98,22 @@ void MPPTaskManager::cancelMPPQuery(UInt64 query_id, const String & reason)
FmtBuffer fmt_buf;
fmt_buf.fmtAppend("Remaining task in query {} are: ", query_id);
// TODO: cancel tasks in order rather than issuing so many threads to cancel tasks
std::vector<std::thread> cancel_workers;
for (const auto & task : task_set->task_map)
auto thread_manager = newThreadManager();
for (auto it = task_set->task_map.begin(); it != task_set->task_map.end();)
{
fmt_buf.fmtAppend("{} ", task.first.toString());
std::thread t(&MPPTask::cancel, task.second, std::ref(reason));
cancel_workers.push_back(std::move(t));
fmt_buf.fmtAppend("{} ", it->first.toString());
auto current_task = it->second;
it = task_set->task_map.erase(it);
thread_manager->schedule(false, "CancelMPPTask", [task = std::move(current_task), &reason] { task->cancel(reason); });
}
LOG_WARNING(log, fmt_buf.toString());
for (auto & worker : cancel_workers)
{
worker.join();
}
MPPQueryTaskSetPtr canceled_task_set;
thread_manager->wait();
{
std::lock_guard lock(mu);
/// just to double check the query still exists
auto it = mpp_query_map.find(query_id);
/// just to double check the query still exists
if (it != mpp_query_map.end())
{
/// hold the canceled task set, so the mpp task will not be deconstruct when holding the
/// `mu` of MPPTaskManager, otherwise it might cause deadlock
canceled_task_set = it->second;
mpp_query_map.erase(it);
}
}
LOG_WARNING(log, "Finish cancel query: " + std::to_string(query_id));
}
Expand Down Expand Up @@ -147,15 +147,11 @@ bool MPPTaskManager::registerTask(MPPTaskPtr task)
return true;
}

bool MPPTaskManager::isTaskToBeCancelled(const MPPTaskId & task_id)
bool MPPTaskManager::isQueryToBeCancelled(UInt64 query_id)
{
std::unique_lock lock(mu);
auto it = mpp_query_map.find(task_id.start_ts);
if (it != mpp_query_map.end() && it->second->to_be_cancelled)
{
return it->second->task_map.find(task_id) != it->second->task_map.end();
}
return false;
auto it = mpp_query_map.find(query_id);
return it != mpp_query_map.end() && it->second->to_be_cancelled;
}

void MPPTaskManager::unregisterTask(MPPTask * task)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace DB
struct MPPQueryTaskSet
{
/// to_be_cancelled is kind of lock, if to_be_cancelled is set
/// to true, then task_map can only be modified by query cancel
/// to true, then task_map can only be accessed by query cancel
/// thread, which means no task can register/un-register for the
/// query, here we do not need mutex because all the write/read
/// to MPPQueryTaskSet is protected by the mutex in MPPTaskManager
Expand Down Expand Up @@ -73,7 +73,7 @@ class MPPTaskManager : private boost::noncopyable

void unregisterTask(MPPTask * task);

bool isTaskToBeCancelled(const MPPTaskId & task_id);
bool isQueryToBeCancelled(UInt64 query_id);

bool tryToScheduleTask(const MPPTaskPtr & task);

Expand Down

0 comments on commit f97567a

Please sign in to comment.