Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(DAQ) fix lock contention in source #45097

Merged
merged 1 commit into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion EventFilter/Utilities/src/DAQSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1075,12 +1075,13 @@ void DAQSource::readWorker(unsigned int tid) {
workerPool_.push(tid);

if (init) {
std::unique_lock<std::mutex> lk(startupLock_);
std::unique_lock<std::mutex> lks(startupLock_);
init = false;
startupCv_.notify_one();
}
cvWakeup_.notify_all();
cvReader_[tid]->wait(lk);
lk.unlock();

if (thread_quit_signal[tid])
return;
Expand Down
20 changes: 11 additions & 9 deletions EventFilter/Utilities/src/EvFDaqDirector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <unistd.h>
#include <cstdio>
#include <boost/algorithm/string.hpp>
#include <fmt/printf.h>

//using boost::asio::ip::tcp;

Expand Down Expand Up @@ -1550,6 +1551,7 @@ namespace evf {
int maxLS) {
EvFDaqDirector::FileStatus fileStatus = noFile;
serverError = false;
std::string dest = fmt::sprintf(" on connection to %s:%s", fileBrokerHost_, fileBrokerPort_);

boost::system::error_code ec;
try {
Expand All @@ -1559,7 +1561,7 @@ namespace evf {
boost::asio::connect(*socket_, *endpoint_iterator_, ec);

if (ec) {
edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
serverError = true;
break;
}
Expand All @@ -1582,25 +1584,25 @@ namespace evf {
boost::asio::write(*socket_, request, ec);
if (ec) {
if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset" << dest;
//we got disconnected, try to reconnect to the server before writing the request
boost::asio::connect(*socket_, *endpoint_iterator_, ec);
if (ec) {
edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
serverError = true;
break;
}
continue;
}
edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec << dest;
serverError = true;
break;
}

boost::asio::streambuf response;
boost::asio::read_until(*socket_, response, "\r\n", ec);
if (ec) {
edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
serverError = true;
break;
}
Expand Down Expand Up @@ -1769,7 +1771,7 @@ namespace evf {
while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
}
if (ec != boost::asio::error::eof) {
edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
serverError = true;
}
}
Expand All @@ -1785,19 +1787,19 @@ namespace evf {
if (!fileBrokerKeepAlive_ && socket_->is_open()) {
socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
if (ec) {
edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec << dest;
}
socket_->close(ec);
if (ec) {
edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
}
}

if (serverError) {
if (socket_->is_open())
socket_->close(ec);
if (ec) {
edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
}
fileStatus = noFile;
sleep(1); //back-off if error detected
Expand Down
3 changes: 2 additions & 1 deletion EventFilter/Utilities/src/FedRawDataInputSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1300,12 +1300,13 @@ void FedRawDataInputSource::readWorker(unsigned int tid) {
workerPool_.push(tid);

if (init) {
std::unique_lock<std::mutex> lk(startupLock_);
std::unique_lock<std::mutex> lks(startupLock_);
init = false;
startupCv_.notify_one();
}
cvWakeup_.notify_all();
cvReader_[tid]->wait(lk);
lk.unlock();

if (thread_quit_signal[tid])
return;
Expand Down