Skip to content

Commit

Permalink
add MPPTask::handleError() (#5202)
Browse files Browse the repository at this point in the history
ref #5095
  • Loading branch information
windtalker authored Jun 29, 2022
1 parent f84d7e3 commit 7a20339
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 89 deletions.
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_pt

/// can not cancel parent inputStream or the exception might be lost
if (!parent.executed)
/// kill the processor so ExchangeReceiver will be closed
parent.processor.cancel(true);
/// use cancel instead of kill to avoid too many useless error message
parent.processor.cancel(false);
}


Expand Down
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/UnionBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream
/// and the exception is lost.
output_queue.emplace(exception);
/// can not cancel itself or the exception might be lost
/// kill the processor so ExchangeReceiver will be closed
processor.cancel(true);
/// use cancel instead of kill to avoid too many useless error message
processor.cancel(false);
}

struct Handler
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::cancel()
{
setEndState(ExchangeReceiverState::CANCELED);
msg_channel.finish();
msg_channel.cancel();
}

template <typename RPCContext>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void MPPHandler::handleError(const MPPTaskPtr & task, String error)
try
{
if (task)
task->cancel(error);
task->handleError(error);
}
catch (...)
{
Expand Down
156 changes: 99 additions & 57 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,34 @@ MPPTask::~MPPTask()
LOG_FMT_DEBUG(log, "finish MPPTask: {}", id.toString());
}

void MPPTask::abortTunnels(const String & message, AbortType abort_type)
{
if (abort_type == AbortType::ONCANCELLATION)
{
closeAllTunnels(message);
}
else
{
RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set");
tunnel_set->writeError(message);
}
}

void MPPTask::abortReceivers()
{
if (likely(receiver_set != nullptr))
{
receiver_set->cancel();
}
}

void MPPTask::abortDataStreams(AbortType abort_type)
{
/// When abort type is ONERROR, it means MPPTask already known it meet error, so let the remaining task stop silently to avoid too many useless error message
bool is_kill = abort_type == AbortType::ONCANCELLATION;
context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, is_kill);
}

void MPPTask::closeAllTunnels(const String & reason)
{
if (likely(tunnel_set))
Expand Down Expand Up @@ -156,14 +184,6 @@ void MPPTask::initExchangeReceivers()
dag_context->setMPPReceiverSet(receiver_set);
}

void MPPTask::cancelAllReceivers()
{
if (likely(receiver_set != nullptr))
{
receiver_set->cancel();
}
}

std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConnectionRequest * request)
{
if (status == CANCELLED)
Expand Down Expand Up @@ -357,42 +377,43 @@ void MPPTask::runImpl()
return_statistics.blocks,
return_statistics.bytes);
}
catch (Exception & e)
{
err_msg = e.displayText();
LOG_FMT_ERROR(log, "task running meets error: {} Stack Trace : {}", err_msg, e.getStackTrace().toString());
}
catch (pingcap::Exception & e)
{
err_msg = e.message();
LOG_FMT_ERROR(log, "task running meets error: {}", err_msg);
}
catch (std::exception & e)
{
err_msg = e.what();
LOG_FMT_ERROR(log, "task running meets error: {}", err_msg);
}
catch (...)
{
err_msg = "unrecovered error";
LOG_FMT_ERROR(log, "task running meets error: {}", err_msg);
err_msg = getCurrentExceptionMessage(true);
}

if (err_msg.empty())
{
// todo when error happens, should try to update the metrics if it is available
auto throughput = dag_context->getTableScanThroughput();
if (throughput.first)
GET_METRIC(tiflash_storage_logical_throughput_bytes).Observe(throughput.second);
auto process_info = context->getProcessListElement()->getInfo();
auto peak_memory = process_info.peak_memory_usage > 0 ? process_info.peak_memory_usage : 0;
GET_METRIC(tiflash_coprocessor_request_memory_usage, type_run_mpp_task).Observe(peak_memory);
mpp_task_statistics.setMemoryPeak(peak_memory);
if (switchStatus(RUNNING, FINISHED))
LOG_INFO(log, "finish task");
else
LOG_FMT_WARNING(log, "finish task which is in {} state", taskStatusToString(status));
if (status == FINISHED)
{
// todo when error happens, should try to update the metrics if it is available
auto throughput = dag_context->getTableScanThroughput();
if (throughput.first)
GET_METRIC(tiflash_storage_logical_throughput_bytes).Observe(throughput.second);
auto process_info = context->getProcessListElement()->getInfo();
auto peak_memory = process_info.peak_memory_usage > 0 ? process_info.peak_memory_usage : 0;
GET_METRIC(tiflash_coprocessor_request_memory_usage, type_run_mpp_task).Observe(peak_memory);
mpp_task_statistics.setMemoryPeak(peak_memory);
}
}
else
{
context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, true);
cancelAllReceivers();
writeErrToAllTunnels(err_msg);
if (status == RUNNING)
{
LOG_FMT_ERROR(log, "task running meets error: {}", err_msg);
try
{
handleError(err_msg);
}
catch (...)
{
tryLogCurrentException(log, "Meet error while try to handle error in MPPTask");
}
}
}
LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
// unregister flag is only for FailPoint usage, to produce the situation that MPPTask is destructed
Expand All @@ -405,52 +426,73 @@ void MPPTask::runImpl()
if (unregister)
unregisterTask();

if (switchStatus(RUNNING, FINISHED))
LOG_INFO(log, "finish task");
else
LOG_WARNING(log, "finish task which was cancelled before");

mpp_task_statistics.end(status.load(), err_msg);
mpp_task_statistics.end(status.load(), err_string);
mpp_task_statistics.logTracingJson();
}

void MPPTask::writeErrToAllTunnels(const String & e)
void MPPTask::handleError(const String & error_msg)
{
RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set");
tunnel_set->writeError(e);
if (manager == nullptr || !manager->isTaskToBeCancelled(id))
abort(error_msg, AbortType::ONERROR);
}

void MPPTask::cancel(const String & reason)
void MPPTask::abort(const String & message, AbortType abort_type)
{
CPUAffinityManager::getInstance().bindSelfQueryThread();
LOG_FMT_WARNING(log, "Begin cancel task: {}", id.toString());
String abort_type_string;
TaskStatus next_task_status;
switch (abort_type)
{
case AbortType::ONCANCELLATION:
abort_type_string = "ONCANCELLATION";
next_task_status = CANCELLED;
break;
case AbortType::ONERROR:
abort_type_string = "ONERROR";
next_task_status = FAILED;
break;
}
LOG_FMT_WARNING(log, "Begin abort task: {}, abort type: {}", id.toString(), abort_type_string);
while (true)
{
auto previous_status = status.load();
if (previous_status == FINISHED || previous_status == CANCELLED)
if (previous_status == FINISHED || previous_status == CANCELLED || previous_status == FAILED)
{
LOG_FMT_WARNING(log, "task already {}", (previous_status == FINISHED ? "finished" : "cancelled"));
LOG_FMT_WARNING(log, "task already in {} state", taskStatusToString(previous_status));
return;
}
else if (previous_status == INITIALIZING && switchStatus(INITIALIZING, CANCELLED))
else if (previous_status == INITIALIZING && switchStatus(INITIALIZING, next_task_status))
{
closeAllTunnels(reason);
err_string = message;
/// if the task is in initializing state, mpp task can return error to TiDB directly,
/// so just close all tunnels here
closeAllTunnels(message);
unregisterTask();
LOG_WARNING(log, "Finish cancel task from uninitialized");
LOG_WARNING(log, "Finish abort task from uninitialized");
return;
}
else if (previous_status == RUNNING && switchStatus(RUNNING, CANCELLED))
else if (previous_status == RUNNING && switchStatus(RUNNING, next_task_status))
{
/// abort the components from top to bottom because if bottom components are aborted
/// first, the top components may see an error caused by the abort, which is not
/// the original error
err_string = message;
abortTunnels(message, abort_type);
abortDataStreams(abort_type);
abortReceivers();
scheduleThisTask(ScheduleState::FAILED);
context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, true);
closeAllTunnels(reason);
/// runImpl is running, leave remaining work to runImpl
LOG_WARNING(log, "Finish cancel task from running");
LOG_WARNING(log, "Finish abort task from running");
return;
}
}
}

void MPPTask::cancel(const String & reason)
{
CPUAffinityManager::getInstance().bindSelfQueryThread();
abort(reason, AbortType::ONCANCELLATION);
}

bool MPPTask::switchStatus(TaskStatus from, TaskStatus to)
{
return status.compare_exchange_strong(from, to);
Expand Down
21 changes: 15 additions & 6 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

void cancel(const String & reason);

void handleError(const String & error_msg);

void prepare(const mpp::DispatchTaskRequest & task_request);

void run();
Expand Down Expand Up @@ -90,12 +92,22 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

void unregisterTask();

void writeErrToAllTunnels(const String & e);

/// Similar to `writeErrToAllTunnels`, but it just try to write the error message to tunnel
/// without waiting the tunnel to be connected
void closeAllTunnels(const String & reason);

enum class AbortType
{
/// todo add ONKILL to distinguish between silent cancellation and kill
ONCANCELLATION,
ONERROR,
};
void abort(const String & message, AbortType abort_type);

void abortTunnels(const String & message, AbortType abort_type);
void abortReceivers();
void abortDataStreams(AbortType abort_type);

void finishWrite();

bool switchStatus(TaskStatus from, TaskStatus to);
Expand All @@ -110,8 +122,6 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

void initExchangeReceivers();

void cancelAllReceivers();

tipb::DAGRequest dag_req;

ContextPtr context;
Expand All @@ -121,6 +131,7 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
MemoryTracker * memory_tracker = nullptr;

std::atomic<TaskStatus> status{INITIALIZING};
String err_string;

mpp::TaskMeta meta;

Expand All @@ -138,8 +149,6 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

MPPTaskStatistics mpp_task_statistics;

Exception err;

friend class MPPTaskManager;

int needed_threads;
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ bool MPPTaskManager::registerTask(MPPTaskPtr task)
return true;
}

bool MPPTaskManager::isTaskToBeCancelled(const MPPTaskId & task_id)
{
std::unique_lock lock(mu);
auto it = mpp_query_map.find(task_id.start_ts);
if (it != mpp_query_map.end() && it->second->to_be_cancelled)
{
return it->second->task_map.find(task_id) != it->second->task_map.end();
}
return false;
}

void MPPTaskManager::unregisterTask(MPPTask * task)
{
std::unique_lock lock(mu);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class MPPTaskManager : private boost::noncopyable

void unregisterTask(MPPTask * task);

bool isTaskToBeCancelled(const MPPTaskId & task_id);

bool tryToScheduleTask(const MPPTaskPtr & task);

void releaseThreadsFromScheduler(const int needed_threads);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Mpp/TaskStatus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ StringRef taskStatusToString(const TaskStatus & status)
return "FINISHED";
case CANCELLED:
return "CANCELLED";
case FAILED:
return "FAILED";
default:
throw Exception("Unknown TaskStatus");
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/TaskStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ enum TaskStatus
RUNNING,
FINISHED,
CANCELLED,
FAILED,
};

StringRef taskStatusToString(const TaskStatus & status);
Expand Down
10 changes: 9 additions & 1 deletion tests/fullstack-test/mpp/issue_2471.test
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@ mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_opt_bro
=> DBGInvoke __enable_fail_point(exception_in_creating_set_input_stream)

mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_opt_broadcast_cartesian_join=2; select * from a as t1 left join a as t2 on t1.id = t2.id;
ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_in_creating_set_input_stream is triggered.
ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_in_creating_set_input_stream is triggered., e.what() = DB::Exception, Stack trace:
{#LINE}
{#LINE}
{#LINE}
{#LINE}
{#LINE}
{#LINE}
{#LINE}
{#LINE}

=> DBGInvoke __disable_fail_point(exception_in_creating_set_input_stream)

Expand Down
Loading

0 comments on commit 7a20339

Please sign in to comment.