diff --git a/lib/vistle/insitu/module/CMakeLists.txt b/lib/vistle/insitu/module/CMakeLists.txt index bab36ae06..66aa538f0 100644 --- a/lib/vistle/insitu/module/CMakeLists.txt +++ b/lib/vistle/insitu/module/CMakeLists.txt @@ -1,8 +1,2 @@ -set(LIBSIM_SOURCES inSituModule.cpp inSituReader.cpp) - -set(LIBSIM_HEADER inSituModule.h inSituReader.h export.h) - -vistle_add_library(vistle_insitu_module EXPORT ${LIBSIM_SOURCES} ${LIBSIM_HEADER}) +vistle_add_library(vistle_insitu_module EXPORT export.h inSituModule.h inSituModule.cpp) vistle_target_link_libraries(vistle_insitu_module PRIVATE vistle_module vistle_insitu_core PUBLIC vistle_insitu_message) - -vistle_install_docs(README.md) diff --git a/lib/vistle/insitu/module/README.md b/lib/vistle/insitu/module/README.md deleted file mode 100644 index f70968a0e..000000000 --- a/lib/vistle/insitu/module/README.md +++ /dev/null @@ -1,9 +0,0 @@ -InSituReader -===================================================== -Base class for modules that connect to external processes(e.g simulations) and are continuously reading data during execute. -After beginExecute() is called the module must not create vistle objects until endExecute() returns. -Vice versa the external process must create its vistle objects after beginExecute() and before endExecute() returns. -Use insitu::message::SyncShmIDs::createVistleObject (initialized with with instance = InstanceNum() of this module) to create vistle objects in the external process. -Use a vistle::message_queue with name "recvFromSim" + InstanceNum() in the external process to send the vistle objects to this module which will send them with its own signature to the manager. - - diff --git a/lib/vistle/insitu/module/inSituReader.cpp b/lib/vistle/insitu/module/inSituReader.cpp deleted file mode 100644 index 7775ddd20..000000000 --- a/lib/vistle/insitu/module/inSituReader.cpp +++ /dev/null @@ -1,164 +0,0 @@ -#include "inSituReader.h" -#include -#include -#include - -using namespace vistle; -using namespace vistle::insitu; -using std::endl; -#define CERR std::cerr << "inSituReader[" << rank() << "/" << size() << "] " - -size_t InSituReader::m_numInstances = 0; - -InSituReader::InSituReader(const std::string &name, const int moduleID, mpi::communicator comm) -: Module(name, moduleID, comm) -{ - setReducePolicy(vistle::message::ReducePolicy::OverAll); -} - -bool InSituReader::isExecuting() -{ - return m_isExecuting; -} - -bool InSituReader::handleExecute(const vistle::message::Execute *exec) -{ - using namespace vistle::message; - - if (m_executionCount < exec->getExecutionCount()) { - m_executionCount = exec->getExecutionCount(); - m_iteration = -1; - } - - bool ret = true; - -#ifdef DETAILED_PROGRESS - Busy busy; - busy.setReferrer(exec->uuid()); - busy.setDestId(Id::LocalManager); - sendMessage(busy); -#endif - if (!m_isExecuting && (exec->what() == Execute::ComputeExecute || exec->what() == Execute::Prepare)) { - applyDelayedChanges(); - ret &= prepareWrapper(exec); - if (ret) { - m_exec = exec; - m_isExecuting = true; - } - return ret; - } - -#ifdef DETAILED_PROGRESS - message::Idle idle; - idle.setReferrer(exec->uuid()); - idle.setDestId(Id::LocalManager); - sendMessage(idle); -#endif - return ret; -} - -bool InSituReader::operate() -{ - return false; -} - -bool InSituReader::dispatch(bool block, bool *messageReceived, unsigned int minPrio) -{ - bool passMsg = false; - bool msgRecv = false; - vistle::message::Buffer buf; - if (m_receiveFromSimMessageQueue && m_receiveFromSimMessageQueue->tryReceive(buf)) { - if (buf.type() != vistle::message::INSITU) { - sendMessage(buf); - } - passMsg = true; - } - bool retval = Module::dispatch(false, &msgRecv, minPrio); - vistle::adaptive_wait(operate() || msgRecv || passMsg); - if (messageReceived) { - *messageReceived = msgRecv; - } - return retval; -} - -bool InSituReader::prepare() -{ - try { - m_shmIDs.set(vistle::Shm::the().objectID(), vistle::Shm::the().arrayID()); - } catch (const vistle::exception &ex) { - CERR << ex.what() << endl; - return false; - } - - if (!beginExecute()) { - return false; - } - return true; -} - -void InSituReader::cancelExecuteMessageReceived(const vistle::message::Message *msg) -{ - if (m_isExecuting) { - vistle::Shm::the().setArrayID(m_shmIDs.arrayID()); - vistle::Shm::the().setObjectID(m_shmIDs.objectID()); - if (!endExecute()) { - sendError("failed to prepare reduce"); - } - assert(m_exec); - reduceWrapper(m_exec); - m_isExecuting = false; - } -} - -size_t InSituReader::instanceNum() const -{ - return m_instanceNum; -} - -void insitu::InSituReader::reconnect() -{ - initRecvFromSimQueue(); - m_shmIDs.close(); - m_shmIDs.initialize(id(), rank(), std::to_string(m_numInstances), insitu::message::SyncShmIDs::Mode::Create); - m_instanceNum = m_numInstances; - ++m_numInstances; -} - -vistle::insitu::message::ModuleInfo::ShmInfo insitu::InSituReader::gatherModuleInfo() const -{ - message::ModuleInfo::ShmInfo shmInfo; - shmInfo.hostname = vistle::hostname(); - shmInfo.id = id(); - shmInfo.mpiSize = size(); - shmInfo.name = name(); - shmInfo.numCons = instanceNum(); - shmInfo.shmName = vistle::Shm::the().instanceName(); - CERR << "vistle::Shm::the().instanceName() = " << vistle::Shm::the().instanceName() - << " vistle::Shm::the().name() = " << vistle::Shm::the().name() << endl; - return shmInfo; -} - -bool vistle::insitu::InSituReader::sendMessage(const vistle::message::Message &message, const buffer *payload) const -{ - if (payload && m_isExecuting) { - CERR - << "InSituReader: can't send message with payload while executing because that would create a vistle-object" - << endl; - return false; - } - return Module::sendMessage(message, payload); -} - -void InSituReader::initRecvFromSimQueue() -{ - std::string msqName = vistle::message::MessageQueue::createName( - ("recvFromSim" + std::to_string(m_numInstances)).c_str(), id(), rank()); - std::cerr << "created msqName " << msqName << std::endl; - try { - m_receiveFromSimMessageQueue.reset(vistle::message::MessageQueue::create(msqName)); - CERR << "receiveFromSimMessageQueue name = " << msqName << std::endl; - } catch (boost::interprocess::interprocess_exception &ex) { - throw vistle::exception(std::string("opening recv from sim message queue with name ") + msqName + ": " + - ex.what()); - } -} diff --git a/lib/vistle/insitu/module/inSituReader.h b/lib/vistle/insitu/module/inSituReader.h deleted file mode 100644 index 216695bbf..000000000 --- a/lib/vistle/insitu/module/inSituReader.h +++ /dev/null @@ -1,52 +0,0 @@ -#ifndef INSITU_READER_H -#define INSITU_READER_H - -#include "export.h" -#include -#include -#include - -namespace vistle { -namespace insitu { - -// this type of module only calls prepare and reduce at the start/ end of the execution process. -// it handles input from the manager also during execution -// when execution starts (prepare) a Simulation that shares the shm area of this module must communicate it's shm ids -// via SyncShmIDs object the sim must only create shm objects while this module m_isExecuting while this module must -// create vistle objects only if !m_isExecuting input ports are not tested on the InSituReader -class V_INSITUMODULEEXPORT InSituReader: public vistle::Module { -public: - InSituReader(const std::string &name, const int moduleID, mpi::communicator comm); - bool isExecuting(); - // use these function to make sure that the insitu process only creates vistle objects after beginExecute and before - // endExecute. - virtual bool beginExecute() = 0; - virtual bool endExecute() = 0; - virtual bool operate(); - virtual void cancelExecuteMessageReceived(const vistle::message::Message *msg) override; - size_t instanceNum() const; - void reconnect(); - message::ModuleInfo::ShmInfo gatherModuleInfo() const; - - virtual bool sendMessage(const vistle::message::Message &message, const buffer *payload = nullptr) const override; - -private: - bool handleExecute(const vistle::message::Execute *exec) override final; - bool dispatch(bool block = true, bool *messageReceived = nullptr, unsigned int minPrio = 0) override final; - bool prepare() override final; - void initRecvFromSimQueue(); - - bool m_isExecuting = false; - const vistle::message::Execute *m_exec = nullptr; - std::unique_ptr - m_receiveFromSimMessageQueue; // receives vistle messages that will be passed through to manager - size_t m_instanceNum = 0; - static size_t m_numInstances; - - insitu::message::SyncShmIDs m_shmIDs; -}; - -} // namespace insitu -} // namespace vistle - -#endif