Skip to content

Commit

Permalink
Merge pull request #45097 from smorovic/141x-fix-contention
Browse files Browse the repository at this point in the history
(DAQ) fix lock contention in source
  • Loading branch information
cmsbuild authored May 31, 2024
2 parents 2ec2d68 + 46c71e0 commit 48a7298
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 11 deletions.
3 changes: 2 additions & 1 deletion EventFilter/Utilities/src/DAQSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1075,12 +1075,13 @@ void DAQSource::readWorker(unsigned int tid) {
workerPool_.push(tid);

if (init) {
std::unique_lock<std::mutex> lk(startupLock_);
std::unique_lock<std::mutex> lks(startupLock_);
init = false;
startupCv_.notify_one();
}
cvWakeup_.notify_all();
cvReader_[tid]->wait(lk);
lk.unlock();

if (thread_quit_signal[tid])
return;
Expand Down
20 changes: 11 additions & 9 deletions EventFilter/Utilities/src/EvFDaqDirector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <unistd.h>
#include <cstdio>
#include <boost/algorithm/string.hpp>
#include <fmt/printf.h>

//using boost::asio::ip::tcp;

Expand Down Expand Up @@ -1550,6 +1551,7 @@ namespace evf {
int maxLS) {
EvFDaqDirector::FileStatus fileStatus = noFile;
serverError = false;
std::string dest = fmt::sprintf(" on connection to %s:%s", fileBrokerHost_, fileBrokerPort_);

boost::system::error_code ec;
try {
Expand All @@ -1559,7 +1561,7 @@ namespace evf {
boost::asio::connect(*socket_, *endpoint_iterator_, ec);

if (ec) {
edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
serverError = true;
break;
}
Expand All @@ -1582,25 +1584,25 @@ namespace evf {
boost::asio::write(*socket_, request, ec);
if (ec) {
if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset" << dest;
//we got disconnected, try to reconnect to the server before writing the request
boost::asio::connect(*socket_, *endpoint_iterator_, ec);
if (ec) {
edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
serverError = true;
break;
}
continue;
}
edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec << dest;
serverError = true;
break;
}

boost::asio::streambuf response;
boost::asio::read_until(*socket_, response, "\r\n", ec);
if (ec) {
edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
serverError = true;
break;
}
Expand Down Expand Up @@ -1769,7 +1771,7 @@ namespace evf {
while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
}
if (ec != boost::asio::error::eof) {
edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
serverError = true;
}
}
Expand All @@ -1785,19 +1787,19 @@ namespace evf {
if (!fileBrokerKeepAlive_ && socket_->is_open()) {
socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
if (ec) {
edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec << dest;
}
socket_->close(ec);
if (ec) {
edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
}
}

if (serverError) {
if (socket_->is_open())
socket_->close(ec);
if (ec) {
edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
}
fileStatus = noFile;
sleep(1); //back-off if error detected
Expand Down
3 changes: 2 additions & 1 deletion EventFilter/Utilities/src/FedRawDataInputSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1300,12 +1300,13 @@ void FedRawDataInputSource::readWorker(unsigned int tid) {
workerPool_.push(tid);

if (init) {
std::unique_lock<std::mutex> lk(startupLock_);
std::unique_lock<std::mutex> lks(startupLock_);
init = false;
startupCv_.notify_one();
}
cvWakeup_.notify_all();
cvReader_[tid]->wait(lk);
lk.unlock();

if (thread_quit_signal[tid])
return;
Expand Down

0 comments on commit 48a7298

Please sign in to comment.