From ac9f7824e95e1524010620c8a7caaca1eee5cc81 Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 22 Jun 2022 16:31:37 +0800 Subject: [PATCH 01/10] add error handle method in MPPTask Signed-off-by: xufei --- .../ParallelAggregatingBlockInputStream.cpp | 4 +- dbms/src/DataStreams/UnionBlockInputStream.h | 4 +- dbms/src/Flash/Mpp/MPPHandler.cpp | 2 +- dbms/src/Flash/Mpp/MPPTask.cpp | 94 +++++++++++++++---- dbms/src/Flash/Mpp/MPPTask.h | 16 +++- dbms/src/Flash/Mpp/TaskStatus.cpp | 2 + dbms/src/Flash/Mpp/TaskStatus.h | 1 + 7 files changed, 97 insertions(+), 26 deletions(-) diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index f4f8dfc1338..f983de91b37 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -198,8 +198,8 @@ void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_pt /// can not cancel parent inputStream or the exception might be lost if (!parent.executed) - /// kill the processor so ExchangeReceiver will be closed - parent.processor.cancel(true); + /// use cancel instead of kill to avoid too many useless error message + parent.processor.cancel(false); } diff --git a/dbms/src/DataStreams/UnionBlockInputStream.h b/dbms/src/DataStreams/UnionBlockInputStream.h index 251d0663e14..a782c3dd087 100644 --- a/dbms/src/DataStreams/UnionBlockInputStream.h +++ b/dbms/src/DataStreams/UnionBlockInputStream.h @@ -293,8 +293,8 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream /// and the exception is lost. output_queue.emplace(exception); /// can not cancel itself or the exception might be lost - /// kill the processor so ExchangeReceiver will be closed - processor.cancel(true); + /// use cancel instead of kill to avoid too many useless error message + processor.cancel(false); } struct Handler diff --git a/dbms/src/Flash/Mpp/MPPHandler.cpp b/dbms/src/Flash/Mpp/MPPHandler.cpp index a3096aaa644..7f97a1dd698 100644 --- a/dbms/src/Flash/Mpp/MPPHandler.cpp +++ b/dbms/src/Flash/Mpp/MPPHandler.cpp @@ -31,7 +31,7 @@ void MPPHandler::handleError(const MPPTaskPtr & task, String error) try { if (task) - task->cancel(error); + task->handleError(error); } catch (...) { diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index ac084ba4550..eff59f19489 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -81,6 +81,39 @@ MPPTask::~MPPTask() LOG_FMT_DEBUG(log, "finish MPPTask: {}", id.toString()); } +void MPPTask::abortTunnels(const String & message, AbortType abort_type) +{ + switch (abort_type) + { + case AbortType::ONCANCELLATION: + closeAllTunnels(message); + break; + case AbortType::ONERROR: + RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set"); + tunnel_set->writeError(message); + break; + } +} + +void MPPTask::abortReceivers(const String &, AbortType) +{ + cancelAllReceivers(); +} + +void MPPTask::abortDataStreams(const String &, AbortType abort_type) +{ + switch (abort_type) + { + case AbortType::ONCANCELLATION: + context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, true); + break; + case AbortType::ONERROR: + /// When abort type is ONERROR, it means MPPTask already known it meet error, so let the remaining task stop silently to avoid too many useless error message + context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, false); + break; + } +} + void MPPTask::closeAllTunnels(const String & reason) { if (likely(tunnel_set)) @@ -390,9 +423,14 @@ void MPPTask::runImpl() } else { - context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, true); - cancelAllReceivers(); - writeErrToAllTunnels(err_msg); + try + { + handleError(err_msg); + } + catch (...) + { + tryLogCurrentException(log, "Meet error while try to handle error in MPPTask"); + } } LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds()); // unregister flag is only for FailPoint usage, to produce the situation that MPPTask is destructed @@ -408,49 +446,67 @@ void MPPTask::runImpl() if (switchStatus(RUNNING, FINISHED)) LOG_INFO(log, "finish task"); else - LOG_WARNING(log, "finish task which was cancelled before"); + LOG_FMT_WARNING(log, "finish task which is in {} state", taskStatusToString(status)); mpp_task_statistics.end(status.load(), err_msg); mpp_task_statistics.logTracingJson(); } -void MPPTask::writeErrToAllTunnels(const String & e) +void MPPTask::handleError(const String & error_msg) { - RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set"); - tunnel_set->writeError(e); + abort(error_msg, AbortType::ONERROR); } -void MPPTask::cancel(const String & reason) +void MPPTask::abort(const String & message, AbortType abort_type) { - CPUAffinityManager::getInstance().bindSelfQueryThread(); - LOG_FMT_WARNING(log, "Begin cancel task: {}", id.toString()); + String abort_type_string; + TaskStatus next_task_status; + switch (abort_type) + { + case AbortType::ONCANCELLATION: + abort_type_string = "ONCANCELLATION"; + next_task_status = CANCELLED; + break; + case AbortType::ONERROR: + abort_type_string = "ONERROR"; + next_task_status = FAILED; + break; + } + LOG_FMT_WARNING(log, "Begin abort task: {}, abort type: {}", id.toString(), abort_type_string); while (true) { auto previous_status = status.load(); - if (previous_status == FINISHED || previous_status == CANCELLED) + if (previous_status == FINISHED || previous_status == CANCELLED || previous_status == FAILED) { - LOG_FMT_WARNING(log, "task already {}", (previous_status == FINISHED ? "finished" : "cancelled")); + LOG_FMT_WARNING(log, "task already in {} state", taskStatusToString(previous_status)); return; } - else if (previous_status == INITIALIZING && switchStatus(INITIALIZING, CANCELLED)) + else if (previous_status == INITIALIZING && switchStatus(INITIALIZING, next_task_status)) { - closeAllTunnels(reason); + abortTunnels(message, abort_type); unregisterTask(); - LOG_WARNING(log, "Finish cancel task from uninitialized"); + LOG_WARNING(log, "Finish abort task from uninitialized"); return; } - else if (previous_status == RUNNING && switchStatus(RUNNING, CANCELLED)) + else if (previous_status == RUNNING && switchStatus(RUNNING, next_task_status)) { + abortTunnels(message, abort_type); + abortDataStreams(message, abort_type); + abortReceivers(message, abort_type); scheduleThisTask(ScheduleState::FAILED); - context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, true); - closeAllTunnels(reason); /// runImpl is running, leave remaining work to runImpl - LOG_WARNING(log, "Finish cancel task from running"); + LOG_WARNING(log, "Finish abort task from running"); return; } } } +void MPPTask::cancel(const String & reason) +{ + CPUAffinityManager::getInstance().bindSelfQueryThread(); + abort(reason, AbortType::ONCANCELLATION); +} + bool MPPTask::switchStatus(TaskStatus from, TaskStatus to) { return status.compare_exchange_strong(from, to); diff --git a/dbms/src/Flash/Mpp/MPPTask.h b/dbms/src/Flash/Mpp/MPPTask.h index d7e5ed169de..137eaca8e86 100644 --- a/dbms/src/Flash/Mpp/MPPTask.h +++ b/dbms/src/Flash/Mpp/MPPTask.h @@ -59,6 +59,8 @@ class MPPTask : public std::enable_shared_from_this void cancel(const String & reason); + void handleError(const String & error_msg); + void prepare(const mpp::DispatchTaskRequest & task_request); void run(); @@ -90,12 +92,22 @@ class MPPTask : public std::enable_shared_from_this void unregisterTask(); - void writeErrToAllTunnels(const String & e); - /// Similar to `writeErrToAllTunnels`, but it just try to write the error message to tunnel /// without waiting the tunnel to be connected void closeAllTunnels(const String & reason); + enum class AbortType + { + /// todo add ONKILL to distinguish between silent cancellation and kill + ONCANCELLATION, + ONERROR, + }; + void abort(const String & message, AbortType abort_type); + + void abortTunnels(const String & message, AbortType abort_type); + void abortReceivers(const String & message, AbortType abort_type); + void abortDataStreams(const String & message, AbortType abort_type); + void finishWrite(); bool switchStatus(TaskStatus from, TaskStatus to); diff --git a/dbms/src/Flash/Mpp/TaskStatus.cpp b/dbms/src/Flash/Mpp/TaskStatus.cpp index 423b768faea..c87ae2b8eb4 100644 --- a/dbms/src/Flash/Mpp/TaskStatus.cpp +++ b/dbms/src/Flash/Mpp/TaskStatus.cpp @@ -29,6 +29,8 @@ StringRef taskStatusToString(const TaskStatus & status) return "FINISHED"; case CANCELLED: return "CANCELLED"; + case FAILED: + return "FAILED"; default: throw Exception("Unknown TaskStatus"); } diff --git a/dbms/src/Flash/Mpp/TaskStatus.h b/dbms/src/Flash/Mpp/TaskStatus.h index 999e30790bf..0997c8adc52 100644 --- a/dbms/src/Flash/Mpp/TaskStatus.h +++ b/dbms/src/Flash/Mpp/TaskStatus.h @@ -24,6 +24,7 @@ enum TaskStatus RUNNING, FINISHED, CANCELLED, + FAILED, }; StringRef taskStatusToString(const TaskStatus & status); From 694221b2535e9b60758de6212d00dab7aaa98a0d Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 23 Jun 2022 09:23:50 +0800 Subject: [PATCH 02/10] refine Signed-off-by: xufei --- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 2 +- dbms/src/Flash/Mpp/MPPTask.cpp | 21 ++++++++------------- dbms/src/Flash/Mpp/MPPTask.h | 6 ++---- 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index ec8bde51469..3b36adf2c40 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -358,7 +358,7 @@ template void ExchangeReceiverBase::cancel() { setEndState(ExchangeReceiverState::CANCELED); - msg_channel.finish(); + msg_channel.cancel(); } template diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index eff59f19489..0406ef4a30e 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -95,12 +95,15 @@ void MPPTask::abortTunnels(const String & message, AbortType abort_type) } } -void MPPTask::abortReceivers(const String &, AbortType) +void MPPTask::abortReceivers() { - cancelAllReceivers(); + if (likely(receiver_set != nullptr)) + { + receiver_set->cancel(); + } } -void MPPTask::abortDataStreams(const String &, AbortType abort_type) +void MPPTask::abortDataStreams(AbortType abort_type) { switch (abort_type) { @@ -189,14 +192,6 @@ void MPPTask::initExchangeReceivers() dag_context->setMPPReceiverSet(receiver_set); } -void MPPTask::cancelAllReceivers() -{ - if (likely(receiver_set != nullptr)) - { - receiver_set->cancel(); - } -} - std::pair MPPTask::getTunnel(const ::mpp::EstablishMPPConnectionRequest * request) { if (status == CANCELLED) @@ -491,8 +486,8 @@ void MPPTask::abort(const String & message, AbortType abort_type) else if (previous_status == RUNNING && switchStatus(RUNNING, next_task_status)) { abortTunnels(message, abort_type); - abortDataStreams(message, abort_type); - abortReceivers(message, abort_type); + abortDataStreams(abort_type); + abortReceivers(); scheduleThisTask(ScheduleState::FAILED); /// runImpl is running, leave remaining work to runImpl LOG_WARNING(log, "Finish abort task from running"); diff --git a/dbms/src/Flash/Mpp/MPPTask.h b/dbms/src/Flash/Mpp/MPPTask.h index 137eaca8e86..3bc77d0f239 100644 --- a/dbms/src/Flash/Mpp/MPPTask.h +++ b/dbms/src/Flash/Mpp/MPPTask.h @@ -105,8 +105,8 @@ class MPPTask : public std::enable_shared_from_this void abort(const String & message, AbortType abort_type); void abortTunnels(const String & message, AbortType abort_type); - void abortReceivers(const String & message, AbortType abort_type); - void abortDataStreams(const String & message, AbortType abort_type); + void abortReceivers(); + void abortDataStreams(AbortType abort_type); void finishWrite(); @@ -122,8 +122,6 @@ class MPPTask : public std::enable_shared_from_this void initExchangeReceivers(); - void cancelAllReceivers(); - tipb::DAGRequest dag_req; ContextPtr context; From e7270f7a6b11e0ca854c09caffe1432603bcd5f0 Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 23 Jun 2022 09:32:55 +0800 Subject: [PATCH 03/10] fix Signed-off-by: xufei --- dbms/src/Flash/Mpp/MPPTask.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 0406ef4a30e..d0cebac985a 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -105,16 +105,18 @@ void MPPTask::abortReceivers() void MPPTask::abortDataStreams(AbortType abort_type) { + bool is_kill; switch (abort_type) { case AbortType::ONCANCELLATION: - context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, true); + is_kill = true; break; case AbortType::ONERROR: /// When abort type is ONERROR, it means MPPTask already known it meet error, so let the remaining task stop silently to avoid too many useless error message - context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, false); + is_kill = false; break; } + context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, is_kill); } void MPPTask::closeAllTunnels(const String & reason) From 492f5fc1ca48f6f8b0d74486368fbe42cb40cc3a Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 23 Jun 2022 10:28:29 +0800 Subject: [PATCH 04/10] refine Signed-off-by: xufei --- dbms/src/Flash/Mpp/MPPTask.cpp | 63 +++++++++++++++------------------- 1 file changed, 27 insertions(+), 36 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index d0cebac985a..1e85bf15a81 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -387,46 +387,42 @@ void MPPTask::runImpl() return_statistics.blocks, return_statistics.bytes); } - catch (Exception & e) - { - err_msg = e.displayText(); - LOG_FMT_ERROR(log, "task running meets error: {} Stack Trace : {}", err_msg, e.getStackTrace().toString()); - } - catch (pingcap::Exception & e) - { - err_msg = e.message(); - LOG_FMT_ERROR(log, "task running meets error: {}", err_msg); - } - catch (std::exception & e) - { - err_msg = e.what(); - LOG_FMT_ERROR(log, "task running meets error: {}", err_msg); - } catch (...) { - err_msg = "unrecovered error"; - LOG_FMT_ERROR(log, "task running meets error: {}", err_msg); + err_msg = getCurrentExceptionMessage(true); } + if (err_msg.empty()) { - // todo when error happens, should try to update the metrics if it is available - auto throughput = dag_context->getTableScanThroughput(); - if (throughput.first) - GET_METRIC(tiflash_storage_logical_throughput_bytes).Observe(throughput.second); - auto process_info = context->getProcessListElement()->getInfo(); - auto peak_memory = process_info.peak_memory_usage > 0 ? process_info.peak_memory_usage : 0; - GET_METRIC(tiflash_coprocessor_request_memory_usage, type_run_mpp_task).Observe(peak_memory); - mpp_task_statistics.setMemoryPeak(peak_memory); + if (switchStatus(RUNNING, FINISHED)) + LOG_INFO(log, "finish task"); + else + LOG_FMT_WARNING(log, "finish task which is in {} state", taskStatusToString(status)); + if (status == FINISHED) + { + // todo when error happens, should try to update the metrics if it is available + auto throughput = dag_context->getTableScanThroughput(); + if (throughput.first) + GET_METRIC(tiflash_storage_logical_throughput_bytes).Observe(throughput.second); + auto process_info = context->getProcessListElement()->getInfo(); + auto peak_memory = process_info.peak_memory_usage > 0 ? process_info.peak_memory_usage : 0; + GET_METRIC(tiflash_coprocessor_request_memory_usage, type_run_mpp_task).Observe(peak_memory); + mpp_task_statistics.setMemoryPeak(peak_memory); + } } else { - try + if (status == RUNNING) { - handleError(err_msg); - } - catch (...) - { - tryLogCurrentException(log, "Meet error while try to handle error in MPPTask"); + LOG_FMT_ERROR(log, "task running meets error: {}", err_msg); + try + { + handleError(err_msg); + } + catch (...) + { + tryLogCurrentException(log, "Meet error while try to handle error in MPPTask"); + } } } LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds()); @@ -440,11 +436,6 @@ void MPPTask::runImpl() if (unregister) unregisterTask(); - if (switchStatus(RUNNING, FINISHED)) - LOG_INFO(log, "finish task"); - else - LOG_FMT_WARNING(log, "finish task which is in {} state", taskStatusToString(status)); - mpp_task_statistics.end(status.load(), err_msg); mpp_task_statistics.logTracingJson(); } From ae778936f373b52f5c2fad56b8faf516388585d0 Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 23 Jun 2022 10:36:29 +0800 Subject: [PATCH 05/10] add comments Signed-off-by: xufei --- dbms/src/Flash/Mpp/MPPTask.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 1e85bf15a81..cc52c111ff6 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -478,6 +478,9 @@ void MPPTask::abort(const String & message, AbortType abort_type) } else if (previous_status == RUNNING && switchStatus(RUNNING, next_task_status)) { + /// abort the components from top to bottom because if bottom components are aborted + /// first, the top components may see an error caused by the abort, which is not + /// the original error abortTunnels(message, abort_type); abortDataStreams(abort_type); abortReceivers(); From c8f4530045c37298b603fe93d1836d23af41f212 Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 24 Jun 2022 13:52:00 +0800 Subject: [PATCH 06/10] address comments Signed-off-by: xufei --- dbms/src/Flash/Mpp/MPPTask.cpp | 7 +++++-- dbms/src/Flash/Mpp/MPPTask.h | 3 +-- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 11 +++++++++++ dbms/src/Flash/Mpp/MPPTaskManager.h | 2 ++ 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index cc52c111ff6..9a72ce7e281 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -436,13 +436,14 @@ void MPPTask::runImpl() if (unregister) unregisterTask(); - mpp_task_statistics.end(status.load(), err_msg); + mpp_task_statistics.end(status.load(), err_string); mpp_task_statistics.logTracingJson(); } void MPPTask::handleError(const String & error_msg) { - abort(error_msg, AbortType::ONERROR); + if (manager == nullptr || !manager->isTaskToBeCancelled(id)) + abort(error_msg, AbortType::ONERROR); } void MPPTask::abort(const String & message, AbortType abort_type) @@ -471,6 +472,7 @@ void MPPTask::abort(const String & message, AbortType abort_type) } else if (previous_status == INITIALIZING && switchStatus(INITIALIZING, next_task_status)) { + err_string = message; abortTunnels(message, abort_type); unregisterTask(); LOG_WARNING(log, "Finish abort task from uninitialized"); @@ -481,6 +483,7 @@ void MPPTask::abort(const String & message, AbortType abort_type) /// abort the components from top to bottom because if bottom components are aborted /// first, the top components may see an error caused by the abort, which is not /// the original error + err_string = message; abortTunnels(message, abort_type); abortDataStreams(abort_type); abortReceivers(); diff --git a/dbms/src/Flash/Mpp/MPPTask.h b/dbms/src/Flash/Mpp/MPPTask.h index 3bc77d0f239..a30150b26e8 100644 --- a/dbms/src/Flash/Mpp/MPPTask.h +++ b/dbms/src/Flash/Mpp/MPPTask.h @@ -131,6 +131,7 @@ class MPPTask : public std::enable_shared_from_this MemoryTracker * memory_tracker = nullptr; std::atomic status{INITIALIZING}; + String err_string; mpp::TaskMeta meta; @@ -148,8 +149,6 @@ class MPPTask : public std::enable_shared_from_this MPPTaskStatistics mpp_task_statistics; - Exception err; - friend class MPPTaskManager; int needed_threads; diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 3df4af5de5f..c5499eda89d 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -147,6 +147,17 @@ bool MPPTaskManager::registerTask(MPPTaskPtr task) return true; } +bool MPPTaskManager::isTaskToBeCancelled(const MPPTaskId & task_id) +{ + std::unique_lock lock(mu); + auto it = mpp_query_map.find(task_id.start_ts); + if (it != mpp_query_map.end() && it->second->to_be_cancelled) + { + return it->second->task_map.find(task_id) != it->second->task_map.end(); + } + return false; +} + void MPPTaskManager::unregisterTask(MPPTask * task) { std::unique_lock lock(mu); diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index d7047804aca..770acea3853 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -73,6 +73,8 @@ class MPPTaskManager : private boost::noncopyable void unregisterTask(MPPTask * task); + bool isTaskToBeCancelled(const MPPTaskId & task_id); + bool tryToScheduleTask(const MPPTaskPtr & task); void releaseThreadsFromScheduler(const int needed_threads); From d42c78701499e6cd23c9f266913143a8318644fa Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 24 Jun 2022 13:55:26 +0800 Subject: [PATCH 07/10] address comments Signed-off-by: xufei --- dbms/src/Flash/Mpp/MPPTask.cpp | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 9a72ce7e281..c184eefe6cd 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -83,15 +83,14 @@ MPPTask::~MPPTask() void MPPTask::abortTunnels(const String & message, AbortType abort_type) { - switch (abort_type) + if (abort_type == AbortType::ONCANCELLATION) { - case AbortType::ONCANCELLATION: closeAllTunnels(message); - break; - case AbortType::ONERROR: + } + else + { RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set"); tunnel_set->writeError(message); - break; } } @@ -105,17 +104,8 @@ void MPPTask::abortReceivers() void MPPTask::abortDataStreams(AbortType abort_type) { - bool is_kill; - switch (abort_type) - { - case AbortType::ONCANCELLATION: - is_kill = true; - break; - case AbortType::ONERROR: - /// When abort type is ONERROR, it means MPPTask already known it meet error, so let the remaining task stop silently to avoid too many useless error message - is_kill = false; - break; - } + /// When abort type is ONERROR, it means MPPTask already known it meet error, so let the remaining task stop silently to avoid too many useless error message + bool is_kill = abort_type == AbortType::ONCANCELLATION; context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, is_kill); } From 3b475426c151d911b1498f7f347265113c270ece Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 29 Jun 2022 11:26:26 +0800 Subject: [PATCH 08/10] fix ci fail Signed-off-by: xufei --- dbms/src/Flash/Mpp/MPPTask.cpp | 4 +++- tests/fullstack-test/mpp/issue_2471.test | 6 ++++- tests/fullstack-test/mpp/mpp_fail.test | 30 ++++++++++++++++++++---- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index c184eefe6cd..c2d5e6f49f8 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -463,7 +463,9 @@ void MPPTask::abort(const String & message, AbortType abort_type) else if (previous_status == INITIALIZING && switchStatus(INITIALIZING, next_task_status)) { err_string = message; - abortTunnels(message, abort_type); + /// if the task is in initializing state, mpp task can return error to TiDB directly, + /// so just close all tunnels here + closeAllTunnels(message); unregisterTask(); LOG_WARNING(log, "Finish abort task from uninitialized"); return; diff --git a/tests/fullstack-test/mpp/issue_2471.test b/tests/fullstack-test/mpp/issue_2471.test index 4a1528595e8..ca8a7c23361 100644 --- a/tests/fullstack-test/mpp/issue_2471.test +++ b/tests/fullstack-test/mpp/issue_2471.test @@ -35,7 +35,11 @@ mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_opt_bro => DBGInvoke __enable_fail_point(exception_in_creating_set_input_stream) mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_opt_broadcast_cartesian_join=2; select * from a as t1 left join a as t2 on t1.id = t2.id; -ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_in_creating_set_input_stream is triggered. +ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_in_creating_set_input_stream is triggered., e.what() = DB::Exception, Stack trace: +{#LINE} +{#LINE} +{#LINE} +{#LINE} => DBGInvoke __disable_fail_point(exception_in_creating_set_input_stream) diff --git a/tests/fullstack-test/mpp/mpp_fail.test b/tests/fullstack-test/mpp/mpp_fail.test index 7af5fef3f89..3f3002f59c5 100644 --- a/tests/fullstack-test/mpp/mpp_fail.test +++ b/tests/fullstack-test/mpp/mpp_fail.test @@ -71,20 +71,32 @@ ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_be ## exception during mpp run non root task => DBGInvoke __enable_fail_point(exception_during_mpp_non_root_task_run) mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; -ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Exchange receiver meet error : DB::Exception: Fail point FailPoints::exception_during_mpp_non_root_task_run is triggered. +ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 0, e.displayText() = DB::Exception: Exchange receiver meet error : Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_during_mpp_non_root_task_run is triggered., e.what() = DB::Exception, Stack trace: +{#LINE} +{#LINE} +{#LINE} => DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) ## exception during mpp run root task => DBGInvoke __enable_fail_point(exception_during_mpp_root_task_run) mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; -ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_during_mpp_root_task_run is triggered. +ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_during_mpp_root_task_run is triggered., e.what() = DB::Exception, Stack trace: +{#LINE} +{#LINE} +{#LINE} +{#LINE} => DBGInvoke __disable_fail_point(exception_during_mpp_root_task_run) ## exception during mpp write err to tunnel => DBGInvoke __enable_fail_point(exception_during_mpp_non_root_task_run) => DBGInvoke __enable_fail_point(exception_during_mpp_write_err_to_tunnel) mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; -ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Exchange receiver meet error : Failed to write error msg to tunnel +ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 0, e.displayText() = DB::Exception: Exchange receiver meet error : Failed to write error msg to tunnel, e.what() = DB::Exception, Stack trace: +{#LINE} +{#LINE} +{#LINE} +{#LINE} +{#LINE} => DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) => DBGInvoke __disable_fail_point(exception_during_mpp_write_err_to_tunnel) @@ -92,7 +104,10 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Exchang => DBGInvoke __enable_fail_point(exception_during_mpp_non_root_task_run) => DBGInvoke __enable_fail_point(exception_during_mpp_close_tunnel) mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; -ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Exchange receiver meet error : DB::Exception: Fail point FailPoints::exception_during_mpp_non_root_task_run is triggered. +ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 0, e.displayText() = DB::Exception: Exchange receiver meet error : Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_during_mpp_non_root_task_run is triggered., e.what() = DB::Exception, Stack trace: +{#LINE} +{#LINE} +{#LINE} => DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) => DBGInvoke __disable_fail_point(exception_during_mpp_close_tunnel) @@ -125,7 +140,12 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Exchang ## ensure build1, build2-probe1, probe2 in the CreatingSets, test the bug where build1 throw exception but not change the build state, thus block the build2-probe1, at last this query hangs. => DBGInvoke __enable_fail_point(exception_mpp_hash_build) mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; set @@tidb_broadcast_join_threshold_count=0; set @@tidb_broadcast_join_threshold_size=0; select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id; -ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_mpp_hash_build is triggered. +ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_mpp_hash_build is triggered., e.what() = DB::Exception, Stack trace: +{#LINE} +{#LINE} +{#LINE} +{#LINE} +{#LINE} => DBGInvoke __disable_fail_point(exception_mpp_hash_build) # Clean up. From dfc24cb12e72d8285114c17d4b150d86f309a8e8 Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 29 Jun 2022 11:26:54 +0800 Subject: [PATCH 09/10] fix ci fail Signed-off-by: xufei --- tests/run-test.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/tests/run-test.py b/tests/run-test.py index 843fe7c79b4..0eee2f5c995 100644 --- a/tests/run-test.py +++ b/tests/run-test.py @@ -29,6 +29,7 @@ UNFINISHED_1_PREFIX = '\t' UNFINISHED_2_PREFIX = ' ' WORD_PH = '{#WORD}' +LINE_PH = '{#LINE}' CURL_TIDB_STATUS_PREFIX = 'curl_tidb> ' verbose = False @@ -138,18 +139,22 @@ def match_ph_word(line): # TODO: Support more place holders, eg: {#NUMBER} def compare_line(line, template): - while True: - i = template.find(WORD_PH) - if i < 0: - return line == template - else: - if line[:i] != template[:i]: - return False - j = match_ph_word(line[i:]) - if j == 0: - return False - template = template[i + len(WORD_PH):] - line = line[i + j:] + l = template.find(LINE_PH) + if l >= 0: + return True + else: + while True: + i = template.find(WORD_PH) + if i < 0: + return line == template + else: + if line[:i] != template[:i]: + return False + j = match_ph_word(line[i:]) + if j == 0: + return False + template = template[i + len(WORD_PH):] + line = line[i + j:] class MySQLCompare: From 67f4a4bcc3809f665b9e2d0f3c3ad57df2ad020c Mon Sep 17 00:00:00 2001 From: xufei Date: Wed, 29 Jun 2022 13:25:04 +0800 Subject: [PATCH 10/10] fix ci Signed-off-by: xufei --- tests/fullstack-test/mpp/issue_2471.test | 4 ++++ tests/fullstack-test/mpp/mpp_fail.test | 20 ++++++++++++++++++++ tests/run-test.py | 10 ++++++++-- 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/tests/fullstack-test/mpp/issue_2471.test b/tests/fullstack-test/mpp/issue_2471.test index ca8a7c23361..497ce605893 100644 --- a/tests/fullstack-test/mpp/issue_2471.test +++ b/tests/fullstack-test/mpp/issue_2471.test @@ -40,6 +40,10 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 10007, e.display {#LINE} {#LINE} {#LINE} +{#LINE} +{#LINE} +{#LINE} +{#LINE} => DBGInvoke __disable_fail_point(exception_in_creating_set_input_stream) diff --git a/tests/fullstack-test/mpp/mpp_fail.test b/tests/fullstack-test/mpp/mpp_fail.test index 3f3002f59c5..e03c6150be6 100644 --- a/tests/fullstack-test/mpp/mpp_fail.test +++ b/tests/fullstack-test/mpp/mpp_fail.test @@ -75,6 +75,10 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 0, e.displayText {#LINE} {#LINE} {#LINE} +{#LINE} +{#LINE} +{#LINE} +{#LINE} => DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) ## exception during mpp run root task @@ -85,6 +89,10 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 10007, e.display {#LINE} {#LINE} {#LINE} +{#LINE} +{#LINE} +{#LINE} +{#LINE} => DBGInvoke __disable_fail_point(exception_during_mpp_root_task_run) ## exception during mpp write err to tunnel @@ -97,6 +105,10 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 0, e.displayText {#LINE} {#LINE} {#LINE} +{#LINE} +{#LINE} +{#LINE} +{#LINE} => DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) => DBGInvoke __disable_fail_point(exception_during_mpp_write_err_to_tunnel) @@ -108,6 +120,10 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 0, e.displayText {#LINE} {#LINE} {#LINE} +{#LINE} +{#LINE} +{#LINE} +{#LINE} => DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) => DBGInvoke __disable_fail_point(exception_during_mpp_close_tunnel) @@ -146,6 +162,10 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 10007, e.display {#LINE} {#LINE} {#LINE} +{#LINE} +{#LINE} +{#LINE} +{#LINE} => DBGInvoke __disable_fail_point(exception_mpp_hash_build) # Clean up. diff --git a/tests/run-test.py b/tests/run-test.py index 0eee2f5c995..a2bcee0ce99 100644 --- a/tests/run-test.py +++ b/tests/run-test.py @@ -199,11 +199,14 @@ def matched(outputs, matches): b = MySQLCompare.parse_excepted_outputs(matches) return a == b else: - if len(outputs) != len(matches): + if len(outputs) > len(matches): return False for i in range(0, len(outputs)): if not compare_line(outputs[i], matches[i]): return False + for i in range(len(outputs), len(matches)): + if not compare_line("", matches[i]): + return False return True @@ -217,11 +220,14 @@ def matched(outputs, matches, fuzz): b = parse_table_parts(matches, fuzz) return a == b else: - if len(outputs) != len(matches): + if len(outputs) > len(matches): return False for i in range(0, len(outputs)): if not compare_line(outputs[i], matches[i]): return False + for i in range(len(outputs), len(matches)): + if not compare_line("", matches[i]): + return False return True