Skip to content

Commit

Permalink
Merge pull request #29445 from Dr15Jones/interProcGenerator
Browse files Browse the repository at this point in the history
Run GeneratorFilter modules in an external process controlled by cmsRun
  • Loading branch information
cmsbuild authored Apr 21, 2020
2 parents 6b5b453 + ee434e6 commit 8869156
Show file tree
Hide file tree
Showing 34 changed files with 1,412 additions and 87 deletions.
2 changes: 1 addition & 1 deletion FWCore/Integration/bin/interprocess.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ int main(int argc, char* argv[]) {
//This class is holding the lock
WorkerChannel communicationChannel(memoryName, uniqueID);

WriteBuffer sm_buffer{memoryName, communicationChannel.fromWorkerBufferIndex()};
WriteBuffer sm_buffer{memoryName, communicationChannel.fromWorkerBufferInfo()};
int counter = 0;

//The lock must be released if there is a catastrophic signal
Expand Down
4 changes: 2 additions & 2 deletions FWCore/Integration/bin/interprocess_random.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ int main(int argc, char* argv[]) {
//This class is holding the lock
WorkerChannel communicationChannel(memoryName, uniqueID);

WriteBuffer sm_buffer{memoryName, communicationChannel.fromWorkerBufferIndex()};
ReadBuffer sm_readbuffer{std::string("Rand") + memoryName, communicationChannel.toWorkerBufferIndex()};
WriteBuffer sm_buffer{memoryName, communicationChannel.fromWorkerBufferInfo()};
ReadBuffer sm_readbuffer{std::string("Rand") + memoryName, communicationChannel.toWorkerBufferInfo()};
int counter = 0;

//The lock must be released if there is a catastrophic signal
Expand Down
2 changes: 1 addition & 1 deletion FWCore/Integration/test/TestInterProcessProd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace testinter {
StreamCache(const std::string& iConfig, int id)
: id_{id},
channel_("testProd", id_),
readBuffer_{channel_.sharedMemoryName(), channel_.fromWorkerBufferIndex()},
readBuffer_{channel_.sharedMemoryName(), channel_.fromWorkerBufferInfo()},
deserializer_{readBuffer_},
br_deserializer_{readBuffer_},
er_deserializer_{readBuffer_},
Expand Down
4 changes: 2 additions & 2 deletions FWCore/Integration/test/TestInterProcessRandomProd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ namespace testinter {
StreamCache(const std::string& iConfig, int id)
: id_{id},
channel_("testProd", id_),
readBuffer_{channel_.sharedMemoryName(), channel_.fromWorkerBufferIndex()},
writeBuffer_{std::string("Rand") + channel_.sharedMemoryName(), channel_.toWorkerBufferIndex()},
readBuffer_{channel_.sharedMemoryName(), channel_.fromWorkerBufferInfo()},
writeBuffer_{std::string("Rand") + channel_.sharedMemoryName(), channel_.toWorkerBufferInfo()},
deserializer_{readBuffer_},
bl_deserializer_{readBuffer_},
randSerializer_{writeBuffer_} {
Expand Down
33 changes: 33 additions & 0 deletions FWCore/SharedMemory/interface/BufferInfo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#ifndef FWCore_SharedMemory_BufferInfo_h
#define FWCore_SharedMemory_BufferInfo_h
// -*- C++ -*-
//
// Package: FWCore/SharedMemory
// Class : BufferInfo
//
/**\class BufferInfo BufferInfo.h " FWCore/SharedMemory/interface/BufferInfo.h"
Description: Information needed to manage the buffer
Usage:
This is an internal detail of the system.
*/
//
// Original Author: Chris Jones
// Created: 21/01/2020
//

// system include files

// user include files

// forward declarations

namespace edm::shared_memory {
struct BufferInfo {
int identifier_;
char index_;
};
} // namespace edm::shared_memory

#endif
13 changes: 7 additions & 6 deletions FWCore/SharedMemory/interface/ControllerChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
// user include files
#include "FWCore/Utilities/interface/Transition.h"
#include "FWCore/Utilities/interface/Exception.h"
#include "FWCore/SharedMemory/interface/BufferInfo.h"

// forward declarations

Expand All @@ -48,7 +49,7 @@ namespace edm::shared_memory {
// ---------- member functions ---------------------------

/** setupWorker must be called only once and done before any calls to doTransition. The functor iF should setup values associated
with shared memory use, such as manipulating the value from toWorkerBufferIndex(). The call to setupWorker proper synchronizes
with shared memory use, such as manipulating the value from toWorkerBufferInfo(). The call to setupWorker proper synchronizes
the Controller and Worker processes.
*/
template <typename F>
Expand Down Expand Up @@ -83,9 +84,9 @@ namespace edm::shared_memory {
}

///This can be used with WriteBuffer to keep Controller and Worker in sync
char* toWorkerBufferIndex() { return toWorkerBufferIndex_; }
BufferInfo* toWorkerBufferInfo() { return toWorkerBufferInfo_; }
///This can be used with ReadBuffer to keep Controller and Worker in sync
char* fromWorkerBufferIndex() { return fromWorkerBufferIndex_; }
BufferInfo* fromWorkerBufferInfo() { return fromWorkerBufferInfo_; }

void stopWorker() {
//std::cout <<"stopWorker"<<std::endl;
Expand All @@ -104,7 +105,7 @@ namespace edm::shared_memory {
bool shouldKeepEvent() const { return *keepEvent_; }

private:
static char* bufferIndex(const char* iWhich, boost::interprocess::managed_shared_memory& mem);
static BufferInfo* bufferInfo(const char* iWhich, boost::interprocess::managed_shared_memory& mem);

std::string uniqueName(std::string iBase) const;

Expand All @@ -116,8 +117,8 @@ namespace edm::shared_memory {
int id_;
std::string smName_;
boost::interprocess::managed_shared_memory managed_sm_;
char* toWorkerBufferIndex_;
char* fromWorkerBufferIndex_;
BufferInfo* toWorkerBufferInfo_;
BufferInfo* fromWorkerBufferInfo_;

boost::interprocess::named_mutex mutex_;
boost::interprocess::named_condition cndFromMain_;
Expand Down
4 changes: 3 additions & 1 deletion FWCore/SharedMemory/interface/ROOTDeserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ namespace edm::shared_memory {
// ---------- member functions ---------------------------
T deserialize() {
T value;
if (buffer_.mustGetBufferAgain()) {
if (previousBufferIdentifier_ != buffer_.bufferIdentifier()) {
auto buff = buffer_.buffer();
bufferFile_.SetBuffer(buff.first, buff.second, kFALSE);
previousBufferIdentifier_ = buffer_.bufferIdentifier();
}

class_->ReadBuffer(bufferFile_, &value);
Expand All @@ -58,6 +59,7 @@ namespace edm::shared_memory {
READBUFFER& buffer_;
TClass* const class_;
TBufferFile bufferFile_;
int previousBufferIdentifier_ = 0;
};
} // namespace edm::shared_memory

Expand Down
18 changes: 10 additions & 8 deletions FWCore/SharedMemory/interface/ReadBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ This works in conjunction with WriteBuffer.

// user include files
#include "FWCore/SharedMemory/interface/buffer_names.h"
#include "FWCore/SharedMemory/interface/BufferInfo.h"

// forward declarations

namespace edm::shared_memory {
class ReadBuffer {
public:
/** iUniqueName : must be unique for all processes running on a system.
iBufferIndex : is a pointer to a shared_memory address where the same address needs to be shared by ReadBuffer and WriteBuffer.
iBufferInfo : is a pointer to a shared_memory address where the same address needs to be shared by ReadBuffer and WriteBuffer.
*/
ReadBuffer(std::string const& iUniqueName, char* iBufferIndex)
: buffer_{nullptr, 0}, bufferIndex_{iBufferIndex}, bufferOldIndex_{3} {
*bufferIndex_ = 0;
ReadBuffer(std::string const& iUniqueName, BufferInfo* iBufferInfo)
: buffer_{nullptr, 0}, bufferInfo_{iBufferInfo}, bufferOldIndex_{3} {
*bufferInfo_ = {0, 0};
bufferNames_[0] = iUniqueName + buffer_names::kBuffer0;
bufferNames_[1] = iUniqueName + buffer_names::kBuffer1;
}
Expand All @@ -47,23 +48,24 @@ namespace edm::shared_memory {
const ReadBuffer& operator=(ReadBuffer&&) = delete;

// ---------- const member functions ---------------------
bool mustGetBufferAgain() const { return *bufferIndex_ != bufferOldIndex_; }
int bufferIdentifier() const { return bufferInfo_->identifier_; }

// ---------- member functions ---------------------------
std::pair<char*, std::size_t> buffer() {
if (mustGetBufferAgain()) {
using namespace boost::interprocess;
sm_ = std::make_unique<managed_shared_memory>(open_only, bufferNames_[*bufferIndex_].c_str());
sm_ = std::make_unique<managed_shared_memory>(open_only, bufferNames_[bufferInfo_->index_].c_str());
buffer_ = sm_->find<char>(buffer_names::kBuffer);
bufferOldIndex_ = *bufferIndex_;
bufferOldIndex_ = bufferInfo_->index_;
}
return buffer_;
}

private:
bool mustGetBufferAgain() const { return bufferInfo_->index_ != bufferOldIndex_; }
// ---------- member data --------------------------------
std::pair<char*, std::size_t> buffer_;
char* bufferIndex_;
BufferInfo* bufferInfo_;
char bufferOldIndex_;
std::array<std::string, 2> bufferNames_;
std::unique_ptr<boost::interprocess::managed_shared_memory> sm_;
Expand Down
9 changes: 5 additions & 4 deletions FWCore/SharedMemory/interface/WorkerChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

// user include files
#include "FWCore/Utilities/interface/Transition.h"
#include "FWCore/SharedMemory/interface/BufferInfo.h"

// forward declarations

Expand All @@ -47,9 +48,9 @@ namespace edm::shared_memory {
boost::interprocess::scoped_lock<boost::interprocess::named_mutex>* accessLock() { return &lock_; }

///This can be used with ReadBuffer to keep Controller and Worker in sync
char* toWorkerBufferIndex() { return toWorkerBufferIndex_; }
BufferInfo* toWorkerBufferInfo() { return toWorkerBufferInfo_; }
///This can be used with WriteBuffer to keep Controller and Worker in sync
char* fromWorkerBufferIndex() { return fromWorkerBufferIndex_; }
BufferInfo* fromWorkerBufferInfo() { return fromWorkerBufferInfo_; }

///Matches the ControllerChannel::setupWorker call
void workerSetupDone() {
Expand Down Expand Up @@ -94,8 +95,8 @@ namespace edm::shared_memory {
bool* stop_;
edm::Transition* transitionType_;
unsigned long long* transitionID_;
char* toWorkerBufferIndex_;
char* fromWorkerBufferIndex_;
BufferInfo* toWorkerBufferInfo_;
BufferInfo* fromWorkerBufferInfo_;
boost::interprocess::named_condition cndToController_;
bool* keepEvent_;
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock_;
Expand Down
11 changes: 6 additions & 5 deletions FWCore/SharedMemory/interface/WriteBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,22 @@

// user include files
#include "FWCore/SharedMemory/interface/buffer_names.h"
#include "FWCore/SharedMemory/interface/BufferInfo.h"

// forward declarations

namespace edm::shared_memory {
class WriteBuffer {
public:
/** iUniqueName : must be unique for all processes running on a system.
iBufferIndex : is a pointer to a shared_memory address where the same address needs to be shared by ReadBuffer and WriteBuffer.
iBufferInfo : is a pointer to a shared_memory address where the same address needs to be shared by ReadBuffer and WriteBuffer.
*/

WriteBuffer(std::string const& iUniqueName, char* iBufferIndex)
: bufferSize_{0}, buffer_{nullptr}, bufferIndex_{iBufferIndex} {
WriteBuffer(std::string const& iUniqueName, BufferInfo* iBufferInfo)
: bufferSize_{0}, buffer_{nullptr}, bufferInfo_{iBufferInfo} {
bufferNames_[0] = iUniqueName + buffer_names::kBuffer0;
bufferNames_[1] = iUniqueName + buffer_names::kBuffer1;
assert(bufferIndex_);
assert(bufferInfo_);
}
WriteBuffer(const WriteBuffer&) = delete;
const WriteBuffer& operator=(const WriteBuffer&) = delete;
Expand All @@ -66,7 +67,7 @@ namespace edm::shared_memory {
// ---------- member data --------------------------------
std::size_t bufferSize_;
char* buffer_;
char* bufferIndex_;
BufferInfo* bufferInfo_;
std::array<std::string, 2> bufferNames_;
std::unique_ptr<boost::interprocess::managed_shared_memory> sm_;
};
Expand Down
4 changes: 2 additions & 2 deletions FWCore/SharedMemory/interface/channel_names.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

namespace edm::shared_memory {
namespace channel_names {
constexpr char const* const kToWorkerBufferIndex = "bufferIndexToWorker";
constexpr char const* const kFromWorkerBufferIndex = "bufferIndexFromWorker";
constexpr char const* const kToWorkerBufferInfo = "bufferInfoToWorker";
constexpr char const* const kFromWorkerBufferInfo = "bufferInfoFromWorker";
constexpr char const* const kMutex = "mtx";
constexpr char const* const kConditionFromMain = "cndFromMain";
constexpr char const* const kConditionToMain = "cndToMain";
Expand Down
14 changes: 7 additions & 7 deletions FWCore/SharedMemory/src/ControllerChannel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ ControllerChannel::ControllerChannel(std::string const& iName, int id)
: id_{id},
smName_{uniqueName(iName)},
managed_sm_{open_or_create, smName_.c_str(), 1024},
toWorkerBufferIndex_{bufferIndex(channel_names::kToWorkerBufferIndex, managed_sm_)},
fromWorkerBufferIndex_{bufferIndex(channel_names::kFromWorkerBufferIndex, managed_sm_)},
toWorkerBufferInfo_{bufferInfo(channel_names::kToWorkerBufferInfo, managed_sm_)},
fromWorkerBufferInfo_{bufferInfo(channel_names::kFromWorkerBufferInfo, managed_sm_)},
mutex_{open_or_create, uniqueName(channel_names::kMutex).c_str()},
cndFromMain_{open_or_create, uniqueName(channel_names::kConditionFromMain).c_str()},
cndToMain_{open_or_create, uniqueName(channel_names::kConditionToMain).c_str()} {
Expand All @@ -61,8 +61,8 @@ ControllerChannel::~ControllerChannel() {
managed_sm_.destroy<bool>(channel_names::kStop);
managed_sm_.destroy<unsigned int>(channel_names::kTransitionType);
managed_sm_.destroy<unsigned long long>(channel_names::kTransitionID);
managed_sm_.destroy<char>(channel_names::kToWorkerBufferIndex);
managed_sm_.destroy<char>(channel_names::kFromWorkerBufferIndex);
managed_sm_.destroy<BufferInfo>(channel_names::kToWorkerBufferInfo);
managed_sm_.destroy<BufferInfo>(channel_names::kFromWorkerBufferInfo);

named_mutex::remove(uniqueName(channel_names::kMutex).c_str());
named_condition::remove(uniqueName(channel_names::kConditionFromMain).c_str());
Expand Down Expand Up @@ -103,8 +103,8 @@ bool ControllerChannel::wait(scoped_lock<named_mutex>& lock, edm::Transition iTr
//
// static member functions
//
char* ControllerChannel::bufferIndex(const char* iWhich, managed_shared_memory& mem) {
mem.destroy<char>(iWhich);
char* v = mem.construct<char>(iWhich)();
BufferInfo* ControllerChannel::bufferInfo(const char* iWhich, managed_shared_memory& mem) {
mem.destroy<BufferInfo>(iWhich);
BufferInfo* v = mem.construct<BufferInfo>(iWhich)();
return v;
}
8 changes: 4 additions & 4 deletions FWCore/SharedMemory/src/WorkerChannel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ WorkerChannel::WorkerChannel(std::string const& iName, const std::string& iUniqu
stop_{managed_shm_.find<bool>(channel_names::kStop).first},
transitionType_{managed_shm_.find<edm::Transition>(channel_names::kTransitionType).first},
transitionID_{managed_shm_.find<unsigned long long>(channel_names::kTransitionID).first},
toWorkerBufferIndex_{managed_shm_.find<char>(channel_names::kToWorkerBufferIndex).first},
fromWorkerBufferIndex_{managed_shm_.find<char>(channel_names::kFromWorkerBufferIndex).first},
toWorkerBufferInfo_{managed_shm_.find<BufferInfo>(channel_names::kToWorkerBufferInfo).first},
fromWorkerBufferInfo_{managed_shm_.find<BufferInfo>(channel_names::kFromWorkerBufferInfo).first},
cndToController_{open_or_create, unique_name(channel_names::kConditionToMain, iUniqueID).c_str()},
keepEvent_{managed_shm_.find<bool>(channel_names::kKeepEvent).first},
lock_{mutex_} {
assert(stop_);
assert(transitionType_);
assert(transitionID_);
assert(toWorkerBufferIndex_);
assert(fromWorkerBufferIndex_);
assert(toWorkerBufferInfo_);
assert(fromWorkerBufferInfo_);
}

//
Expand Down
9 changes: 5 additions & 4 deletions FWCore/SharedMemory/src/WriteBuffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,25 @@ WriteBuffer::~WriteBuffer() {
if (sm_) {
sm_->destroy<char>(buffer_names::kBuffer);
sm_.reset();
boost::interprocess::shared_memory_object::remove(bufferNames_[*bufferIndex_].c_str());
boost::interprocess::shared_memory_object::remove(bufferNames_[bufferInfo_->index_].c_str());
}
}
//
// member functions
//
void WriteBuffer::growBuffer(std::size_t iLength) {
int newBuffer = (*bufferIndex_ + 1) % 2;
int newBuffer = (bufferInfo_->index_ + 1) % 2;
if (sm_) {
sm_->destroy<char>(buffer_names::kBuffer);
sm_.reset();
boost::interprocess::shared_memory_object::remove(bufferNames_[*bufferIndex_].c_str());
boost::interprocess::shared_memory_object::remove(bufferNames_[bufferInfo_->index_].c_str());
}
sm_ = std::make_unique<boost::interprocess::managed_shared_memory>(
boost::interprocess::open_or_create, bufferNames_[newBuffer].c_str(), iLength + 1024);
assert(sm_.get());
bufferSize_ = iLength;
*bufferIndex_ = newBuffer;
bufferInfo_->index_ = newBuffer;
bufferInfo_->identifier_ = bufferInfo_->identifier_ + 1;
buffer_ = sm_->construct<char>(buffer_names::kBuffer)[iLength](0);
assert(buffer_);
}
Expand Down
Loading

0 comments on commit 8869156

Please sign in to comment.