Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data race in tunnel_set/receiver_set and establish call data (#5650) #5652

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,12 @@ void DAGContext::addCoprocessorReader(const CoprocessorReaderPtr & coprocessor_r
{
if (!isMPPTask())
return;
RUNTIME_ASSERT(mpp_receiver_set != nullptr, log, "MPPTask without receiver set");
return mpp_receiver_set->addCoprocessorReader(coprocessor_reader);
coprocessor_readers.push_back(coprocessor_reader);
}

std::vector<CoprocessorReaderPtr> & DAGContext::getCoprocessorReaders()
{
return coprocessor_readers;
}

bool DAGContext::containsRegionsInfoForTable(Int64 table_id) const
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ class DAGContext
mpp_receiver_set = receiver_set;
}
void addCoprocessorReader(const CoprocessorReaderPtr & coprocessor_reader);
std::vector<CoprocessorReaderPtr> & getCoprocessorReaders();

void addSubquery(const String & subquery_id, SubqueryForSet && subquery);
bool hasSubquery() const { return !subqueries.empty(); }
Expand Down Expand Up @@ -388,6 +389,7 @@ class DAGContext
std::atomic<UInt64> warning_count;

MPPReceiverSetPtr mpp_receiver_set;
std::vector<CoprocessorReaderPtr> coprocessor_readers;
/// vector of SubqueriesForSets(such as join build subquery).
/// The order of the vector is also the order of the subquery.
std::vector<SubqueriesForSets> subqueries;
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,8 @@ bool EstablishCallData::write(const mpp::MPPDataPacket & packet)
void EstablishCallData::writeErr(const mpp::MPPDataPacket & packet)
{
state = ERR_HANDLE;
if (write(packet))
err_status = grpc::Status::OK;
else
err_status = grpc::Status(grpc::StatusCode::UNKNOWN, "Write error message failed for unknown reason.");
err_status = grpc::Status::OK;
write(packet);
}

void EstablishCallData::setFinishState(const String & msg)
Expand Down
47 changes: 38 additions & 9 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ void MPPTask::abortTunnels(const String & message, AbortType abort_type)

void MPPTask::abortReceivers()
{
if (likely(receiver_set != nullptr))
{
receiver_set->cancel();
std::unique_lock lock(tunnel_and_receiver_mu);
if unlikely (receiver_set == nullptr)
return;
}
receiver_set->cancel();
}

void MPPTask::abortDataStreams(AbortType abort_type)
Expand All @@ -111,8 +113,12 @@ void MPPTask::abortDataStreams(AbortType abort_type)

void MPPTask::closeAllTunnels(const String & reason)
{
if (likely(tunnel_set))
tunnel_set->close(reason);
{
std::unique_lock lock(tunnel_and_receiver_mu);
if (unlikely(tunnel_set == nullptr))
return;
}
tunnel_set->close(reason);
}

void MPPTask::finishWrite()
Expand All @@ -128,7 +134,7 @@ void MPPTask::run()

void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
{
tunnel_set = std::make_shared<MPPTunnelSet>(log->identifier());
auto tunnel_set_local = std::make_shared<MPPTunnelSet>(log->identifier());
std::chrono::seconds timeout(task_request.timeout());
const auto & exchange_sender = dag_req.root_executor().exchange_sender();

Expand All @@ -144,17 +150,24 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
LOG_FMT_DEBUG(log, "begin to register the tunnel {}", tunnel->id());
if (status != INITIALIZING)
throw Exception(fmt::format("The tunnel {} can not be registered, because the task is not in initializing state", tunnel->id()));
tunnel_set->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel);
tunnel_set_local->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel);
if (!dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task);
}
}
{
std::unique_lock lock(tunnel_and_receiver_mu);
if (status != INITIALIZING)
throw Exception(fmt::format("The tunnels can not be registered, because the task is not in initializing state"));
tunnel_set = std::move(tunnel_set_local);
}
dag_context->tunnel_set = tunnel_set;
}

void MPPTask::initExchangeReceivers()
{
receiver_set = std::make_shared<MPPReceiverSet>(log->identifier());
auto receiver_set_local = std::make_shared<MPPReceiverSet>(log->identifier());
traverseExecutors(&dag_req, [&](const tipb::Executor & executor) {
if (executor.tp() == tipb::ExecType::TypeExchangeReceiver)
{
Expand All @@ -177,11 +190,17 @@ void MPPTask::initExchangeReceivers()
if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");

receiver_set->addExchangeReceiver(executor_id, exchange_receiver);
receiver_set_local->addExchangeReceiver(executor_id, exchange_receiver);
new_thread_count_of_exchange_receiver += exchange_receiver->computeNewThreadCount();
}
return true;
});
{
std::unique_lock lock(tunnel_and_receiver_mu);
if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");
receiver_set = std::move(receiver_set_local);
}
dag_context->setMPPReceiverSet(receiver_set);
}

Expand Down Expand Up @@ -292,7 +311,6 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
// register tunnels
registerTunnels(task_request);

dag_context->tunnel_set = tunnel_set;
// register task.
auto task_manager = tmt_context.getMPPTaskManager();
LOG_FMT_DEBUG(log, "begin to register the task {}", id.toString());
Expand All @@ -318,8 +336,19 @@ void MPPTask::preprocess()
{
auto start_time = Clock::now();
initExchangeReceivers();
<<<<<<< HEAD
DAGQuerySource dag(*context);
executeQuery(dag, *context, false, QueryProcessingStage::Complete);
=======
executeQuery(*context);
{
std::unique_lock lock(tunnel_and_receiver_mu);
if (status != RUNNING)
throw Exception("task not in running state, may be cancelled");
for (auto & r : dag_context->getCoprocessorReaders())
receiver_set->addCoprocessorReader(r);
}
>>>>>>> 7428d3728f (Fix data race in tunnel_set/receiver_set and establish call data (#5650))
auto end_time = Clock::now();
dag_context->compile_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count();
mpp_task_statistics.setCompileTimestamp(start_time, end_time);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

MPPTaskId id;

std::mutex tunnel_and_receiver_mu;

MPPTunnelSetPtr tunnel_set;

MPPReceiverSetPtr receiver_set;
Expand Down
15 changes: 1 addition & 14 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,7 @@ MPPTunnel::~MPPTunnel()
});
try
{
{
std::unique_lock lock(*mu);
if (status == TunnelStatus::Finished)
{
LOG_DEBUG(log, "already finished!");
return;
}

/// make sure to finish the tunnel after it is connected
waitUntilConnectedOrFinished(lock);
finishSendQueue();
}
LOG_FMT_TRACE(log, "waiting consumer finish!");
waitForSenderFinish(/*allow_throw=*/false);
close("");
}
catch (...)
{
Expand Down