Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add MPPTask::handleError() #5202

Merged
merged 10 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_pt

/// can not cancel parent inputStream or the exception might be lost
if (!parent.executed)
/// kill the processor so ExchangeReceiver will be closed
parent.processor.cancel(true);
/// use cancel instead of kill to avoid too many useless error message
parent.processor.cancel(false);
yibin87 marked this conversation as resolved.
Show resolved Hide resolved
}


Expand Down
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/UnionBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ class UnionBlockInputStream final : public IProfilingBlockInputStream
/// and the exception is lost.
output_queue.emplace(exception);
/// can not cancel itself or the exception might be lost
/// kill the processor so ExchangeReceiver will be closed
processor.cancel(true);
/// use cancel instead of kill to avoid too many useless error message
processor.cancel(false);
}

struct Handler
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::cancel()
{
setEndState(ExchangeReceiverState::CANCELED);
msg_channel.finish();
msg_channel.cancel();
}

template <typename RPCContext>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void MPPHandler::handleError(const MPPTaskPtr & task, String error)
try
{
if (task)
task->cancel(error);
task->handleError(error);
}
catch (...)
{
Expand Down
156 changes: 99 additions & 57 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,34 @@ MPPTask::~MPPTask()
LOG_FMT_DEBUG(log, "finish MPPTask: {}", id.toString());
}

void MPPTask::abortTunnels(const String & message, AbortType abort_type)
{
if (abort_type == AbortType::ONCANCELLATION)
{
closeAllTunnels(message);
}
else
{
RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set");
tunnel_set->writeError(message);
}
}

void MPPTask::abortReceivers()
{
if (likely(receiver_set != nullptr))
{
receiver_set->cancel();
}
}

void MPPTask::abortDataStreams(AbortType abort_type)
{
/// When abort type is ONERROR, it means MPPTask already known it meet error, so let the remaining task stop silently to avoid too many useless error message
bool is_kill = abort_type == AbortType::ONCANCELLATION;
context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, is_kill);
}

void MPPTask::closeAllTunnels(const String & reason)
{
if (likely(tunnel_set))
Expand Down Expand Up @@ -156,14 +184,6 @@ void MPPTask::initExchangeReceivers()
dag_context->setMPPReceiverSet(receiver_set);
}

void MPPTask::cancelAllReceivers()
{
if (likely(receiver_set != nullptr))
{
receiver_set->cancel();
}
}

std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConnectionRequest * request)
{
if (status == CANCELLED)
Expand Down Expand Up @@ -357,42 +377,43 @@ void MPPTask::runImpl()
return_statistics.blocks,
return_statistics.bytes);
}
catch (Exception & e)
{
err_msg = e.displayText();
LOG_FMT_ERROR(log, "task running meets error: {} Stack Trace : {}", err_msg, e.getStackTrace().toString());
}
catch (pingcap::Exception & e)
{
err_msg = e.message();
LOG_FMT_ERROR(log, "task running meets error: {}", err_msg);
}
catch (std::exception & e)
{
err_msg = e.what();
LOG_FMT_ERROR(log, "task running meets error: {}", err_msg);
}
catch (...)
{
err_msg = "unrecovered error";
LOG_FMT_ERROR(log, "task running meets error: {}", err_msg);
err_msg = getCurrentExceptionMessage(true);
}

if (err_msg.empty())
{
// todo when error happens, should try to update the metrics if it is available
auto throughput = dag_context->getTableScanThroughput();
if (throughput.first)
GET_METRIC(tiflash_storage_logical_throughput_bytes).Observe(throughput.second);
auto process_info = context->getProcessListElement()->getInfo();
auto peak_memory = process_info.peak_memory_usage > 0 ? process_info.peak_memory_usage : 0;
GET_METRIC(tiflash_coprocessor_request_memory_usage, type_run_mpp_task).Observe(peak_memory);
mpp_task_statistics.setMemoryPeak(peak_memory);
if (switchStatus(RUNNING, FINISHED))
LOG_INFO(log, "finish task");
else
LOG_FMT_WARNING(log, "finish task which is in {} state", taskStatusToString(status));
if (status == FINISHED)
{
// todo when error happens, should try to update the metrics if it is available
auto throughput = dag_context->getTableScanThroughput();
if (throughput.first)
GET_METRIC(tiflash_storage_logical_throughput_bytes).Observe(throughput.second);
auto process_info = context->getProcessListElement()->getInfo();
auto peak_memory = process_info.peak_memory_usage > 0 ? process_info.peak_memory_usage : 0;
GET_METRIC(tiflash_coprocessor_request_memory_usage, type_run_mpp_task).Observe(peak_memory);
mpp_task_statistics.setMemoryPeak(peak_memory);
}
}
else
{
context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, true);
cancelAllReceivers();
writeErrToAllTunnels(err_msg);
if (status == RUNNING)
{
LOG_FMT_ERROR(log, "task running meets error: {}", err_msg);
try
{
handleError(err_msg);
}
catch (...)
{
tryLogCurrentException(log, "Meet error while try to handle error in MPPTask");
}
}
}
LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
// unregister flag is only for FailPoint usage, to produce the situation that MPPTask is destructed
Expand All @@ -405,52 +426,73 @@ void MPPTask::runImpl()
if (unregister)
unregisterTask();

if (switchStatus(RUNNING, FINISHED))
LOG_INFO(log, "finish task");
else
LOG_WARNING(log, "finish task which was cancelled before");

mpp_task_statistics.end(status.load(), err_msg);
mpp_task_statistics.end(status.load(), err_string);
mpp_task_statistics.logTracingJson();
}

void MPPTask::writeErrToAllTunnels(const String & e)
void MPPTask::handleError(const String & error_msg)
{
RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set");
tunnel_set->writeError(e);
if (manager == nullptr || !manager->isTaskToBeCancelled(id))
abort(error_msg, AbortType::ONERROR);
}

void MPPTask::cancel(const String & reason)
void MPPTask::abort(const String & message, AbortType abort_type)
{
CPUAffinityManager::getInstance().bindSelfQueryThread();
LOG_FMT_WARNING(log, "Begin cancel task: {}", id.toString());
String abort_type_string;
TaskStatus next_task_status;
switch (abort_type)
{
case AbortType::ONCANCELLATION:
abort_type_string = "ONCANCELLATION";
next_task_status = CANCELLED;
break;
case AbortType::ONERROR:
abort_type_string = "ONERROR";
next_task_status = FAILED;
break;
}
LOG_FMT_WARNING(log, "Begin abort task: {}, abort type: {}", id.toString(), abort_type_string);
while (true)
{
auto previous_status = status.load();
if (previous_status == FINISHED || previous_status == CANCELLED)
if (previous_status == FINISHED || previous_status == CANCELLED || previous_status == FAILED)
{
LOG_FMT_WARNING(log, "task already {}", (previous_status == FINISHED ? "finished" : "cancelled"));
LOG_FMT_WARNING(log, "task already in {} state", taskStatusToString(previous_status));
return;
}
else if (previous_status == INITIALIZING && switchStatus(INITIALIZING, CANCELLED))
else if (previous_status == INITIALIZING && switchStatus(INITIALIZING, next_task_status))
{
closeAllTunnels(reason);
err_string = message;
/// if the task is in initializing state, mpp task can return error to TiDB directly,
/// so just close all tunnels here
closeAllTunnels(message);
unregisterTask();
LOG_WARNING(log, "Finish cancel task from uninitialized");
LOG_WARNING(log, "Finish abort task from uninitialized");
return;
}
else if (previous_status == RUNNING && switchStatus(RUNNING, CANCELLED))
else if (previous_status == RUNNING && switchStatus(RUNNING, next_task_status))
{
/// abort the components from top to bottom because if bottom components are aborted
/// first, the top components may see an error caused by the abort, which is not
/// the original error
err_string = message;
abortTunnels(message, abort_type);
abortDataStreams(abort_type);
abortReceivers();
scheduleThisTask(ScheduleState::FAILED);
context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, true);
closeAllTunnels(reason);
/// runImpl is running, leave remaining work to runImpl
LOG_WARNING(log, "Finish cancel task from running");
LOG_WARNING(log, "Finish abort task from running");
return;
}
}
}

void MPPTask::cancel(const String & reason)
{
CPUAffinityManager::getInstance().bindSelfQueryThread();
abort(reason, AbortType::ONCANCELLATION);
}

bool MPPTask::switchStatus(TaskStatus from, TaskStatus to)
{
return status.compare_exchange_strong(from, to);
Expand Down
21 changes: 15 additions & 6 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

void cancel(const String & reason);

void handleError(const String & error_msg);

void prepare(const mpp::DispatchTaskRequest & task_request);

void run();
Expand Down Expand Up @@ -90,12 +92,22 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

void unregisterTask();

void writeErrToAllTunnels(const String & e);

/// Similar to `writeErrToAllTunnels`, but it just try to write the error message to tunnel
/// without waiting the tunnel to be connected
void closeAllTunnels(const String & reason);

enum class AbortType
{
/// todo add ONKILL to distinguish between silent cancellation and kill
ONCANCELLATION,
ONERROR,
};
void abort(const String & message, AbortType abort_type);

void abortTunnels(const String & message, AbortType abort_type);
void abortReceivers();
void abortDataStreams(AbortType abort_type);

void finishWrite();

bool switchStatus(TaskStatus from, TaskStatus to);
Expand All @@ -110,8 +122,6 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

void initExchangeReceivers();

void cancelAllReceivers();

tipb::DAGRequest dag_req;

ContextPtr context;
Expand All @@ -121,6 +131,7 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
MemoryTracker * memory_tracker = nullptr;

std::atomic<TaskStatus> status{INITIALIZING};
String err_string;

mpp::TaskMeta meta;

Expand All @@ -138,8 +149,6 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

MPPTaskStatistics mpp_task_statistics;

Exception err;

friend class MPPTaskManager;

int needed_threads;
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ bool MPPTaskManager::registerTask(MPPTaskPtr task)
return true;
}

bool MPPTaskManager::isTaskToBeCancelled(const MPPTaskId & task_id)
{
std::unique_lock lock(mu);
auto it = mpp_query_map.find(task_id.start_ts);
if (it != mpp_query_map.end() && it->second->to_be_cancelled)
{
return it->second->task_map.find(task_id) != it->second->task_map.end();
}
return false;
}

void MPPTaskManager::unregisterTask(MPPTask * task)
{
std::unique_lock lock(mu);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class MPPTaskManager : private boost::noncopyable

void unregisterTask(MPPTask * task);

bool isTaskToBeCancelled(const MPPTaskId & task_id);

bool tryToScheduleTask(const MPPTaskPtr & task);

void releaseThreadsFromScheduler(const int needed_threads);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Mpp/TaskStatus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ StringRef taskStatusToString(const TaskStatus & status)
return "FINISHED";
case CANCELLED:
return "CANCELLED";
case FAILED:
return "FAILED";
default:
throw Exception("Unknown TaskStatus");
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/TaskStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ enum TaskStatus
RUNNING,
FINISHED,
CANCELLED,
FAILED,
};

StringRef taskStatusToString(const TaskStatus & status);
Expand Down
10 changes: 9 additions & 1 deletion tests/fullstack-test/mpp/issue_2471.test
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@ mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_opt_bro
=> DBGInvoke __enable_fail_point(exception_in_creating_set_input_stream)

mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_opt_broadcast_cartesian_join=2; select * from a as t1 left join a as t2 on t1.id = t2.id;
ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_in_creating_set_input_stream is triggered.
ERROR 1105 (HY000) at line 1: other error for mpp stream: Code: 10007, e.displayText() = DB::Exception: Fail point FailPoints::exception_in_creating_set_input_stream is triggered., e.what() = DB::Exception, Stack trace:
{#LINE}
{#LINE}
{#LINE}
{#LINE}
{#LINE}
{#LINE}
{#LINE}
{#LINE}

=> DBGInvoke __disable_fail_point(exception_in_creating_set_input_stream)

Expand Down
Loading