diff --git a/CMakeLists.txt b/CMakeLists.txt index 61f39c7ff6e3..6d4c40bc57c1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -94,7 +94,7 @@ endif () # C++14 is needed by Google Test >= 1.13, for all the other parts C++11 should be enough. # This will silently fall back to C++11 if 14 is not supported by the compiler. -set(CMAKE_CXX_STANDARD 14) +set(CMAKE_CXX_STANDARD 17) # compiler specific flags if (GNU_COMPILER OR CLANG_COMPILER) @@ -532,6 +532,8 @@ if (EIGEN3_FOUND) set(ENABLED_FEATURES "${ENABLED_FEATURES} Eigen") endif (EIGEN3_FOUND) +option(WITH_PARQUET "Compile with Parquet support" ON) + if (CHECK_OPTIONAL_LIBS) file(GLOB GDAL_PATH "${SUMO_LIBRARIES}/gdal-?.?.?") file(GLOB FFMPEG_PATH "${SUMO_LIBRARIES}/FFMPEG-?.?.?") @@ -541,6 +543,27 @@ if (CHECK_OPTIONAL_LIBS) set(CMAKE_PREFIX_PATH "${CMAKE_PREFIX_PATH};${GDAL_PATH};${FFMPEG_PATH};${OSG_PATH};${GL2PS_PATH};${GEOS_PATH}") file(GLOB SUMO_OPTIONAL_LIBRARIES_DLL "${GDAL_PATH}/bin/*.dll" "${FFMPEG_PATH}/bin/*.dll" "${OSG_PATH}/bin/*.dll" "${GL2PS_PATH}/bin/*.dll" "${JUPEDSIM_CUSTOMDIR}/bin/*.dll") + if(WITH_PARQUET) + find_package(Arrow) + find_package(Parquet) + if (Arrow_FOUND AND Parquet_FOUND) + set(HAVE_PARQUET 1) + set(ENABLED_FEATURES "${ENABLED_FEATURES} PARQUET") + set(PARQUET_LIBRARY Parquet::parquet_shared) + # if (GTEST_FOUND) + # add_definitions("GTest_SOURCE=Bundled") + # endif() + # this is for gtest compatibility + else() + message(WARNING "Parquet support requested but Arrow or Parquet libraries not found. Disabling Parquet support.") + set(PARQUET_LIBRARY "") + set(HAVE_PARQUET 0) + endif() + else() + set(PARQUET_LIBRARY "") + set(HAVE_PARQUET 0) + endif() + # GDAL (for geopositioning) find_package(GDAL) if (GDAL_FOUND) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1a8c2e361ffd..7fcfdd190e8c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,9 +1,14 @@ set(netconvertlibs netwrite netimport netbuild foreign_eulerspiral ${GDAL_LIBRARY} netimport_vissim netimport_vissim_typeloader netimport_vissim_tempstructs ${commonlibs} ${TCMALLOC_LIBRARY}) +if(WITH_PARQUET) + FIND_LIBRARY(PARQUET_LIBRARY NAMES parquet) +endif() + set(sumolibs traciserver netload microsim_cfmodels microsim_engine microsim_lcmodels microsim_devices microsim_trigger microsim_output microsim_transportables microsim_actions - microsim_traffic_lights microsim mesosim ${commonvehiclelibs} ${GEOS_LIBRARY}) + microsim_traffic_lights microsim mesosim ${commonvehiclelibs} ${GEOS_LIBRARY} ${PARQUET_LIBRARY}) + if (OPENSCENEGRAPH_FOUND) set(osgviewlib osgview) endif () @@ -57,6 +62,7 @@ add_executable(sumo sumo_main.cpp) set_target_properties(sumo PROPERTIES OUTPUT_NAME sumo${BINARY_SUFFIX}) set_target_properties(sumo PROPERTIES OUTPUT_NAME_DEBUG sumo${BINARY_SUFFIX}D) target_link_libraries(sumo microsim traciserver libsumostatic ${sumolibs} ${TCMALLOC_LIBRARY}) + add_dependencies(sumo generate-version-h install_dll) if (FOX_FOUND) diff --git a/src/config.h.cmake b/src/config.h.cmake index 33f5e304801c..f2a4fe219063 100644 --- a/src/config.h.cmake +++ b/src/config.h.cmake @@ -184,6 +184,9 @@ /* defined if GDAL is available */ #cmakedefine HAVE_GDAL +/* defined if PARQUET is available */ +# cmakedefine HAVE_PARQUET + /* defined if GL2PS is available */ #cmakedefine HAVE_GL2PS diff --git a/src/microsim/output/MSVTKExport.cpp b/src/microsim/output/MSVTKExport.cpp index 6887a643ccee..00c429d4a1fb 100644 --- a/src/microsim/output/MSVTKExport.cpp +++ b/src/microsim/output/MSVTKExport.cpp @@ -46,7 +46,7 @@ MSVTKExport::write(OutputDevice& of, SUMOTime /* timestep */) { of << "\n"; of << "\n"; of << "\n"; - of << " \n"; + of << " \n"; of << "\n"; of << " " << List2String(getSpeed()) << "\n"; of << "\n"; @@ -56,7 +56,7 @@ MSVTKExport::write(OutputDevice& of, SUMOTime /* timestep */) { of << "\n"; of << "\n"; of << " " << getOffset((int) speed.size()) << "\n"; - of << " " << speed.size() << "\n"; + of << " " << toString(speed.size()) << "\n"; of << "\n"; of << "\n"; of << " \n"; diff --git a/src/utils/common/FileHelpers.cpp b/src/utils/common/FileHelpers.cpp index dc6a7fccd535..7be7f0423aa9 100644 --- a/src/utils/common/FileHelpers.cpp +++ b/src/utils/common/FileHelpers.cpp @@ -135,6 +135,15 @@ FileHelpers::addExtension(const std::string& path, const std::string& extension) } } +std::string +FileHelpers::getExtension(const std::string& path) { + const auto beg = path.find_last_of("."); + if (beg == std::string::npos) { + return ""; + } + return path.substr(beg, path.size()); +} + std::string FileHelpers::getConfigurationRelative(const std::string& configPath, const std::string& path) { diff --git a/src/utils/common/FileHelpers.h b/src/utils/common/FileHelpers.h index afe23e3340b2..a10e88d1bf27 100644 --- a/src/utils/common/FileHelpers.h +++ b/src/utils/common/FileHelpers.h @@ -80,6 +80,13 @@ class FileHelpers { */ static std::string addExtension(const std::string& path, const std::string& extension); + /** @brief Get the file extension from the given file path + * + * @param[in] path The path to the file + * @return the file extension (with dot, example: '.xml') + */ + static std::string getExtension(const std::string& path); + /** @brief Returns the second path as a relative path to the first file * * Given the position of the configuration file, and the information where a second diff --git a/src/utils/common/MsgRetrievingFunction.h b/src/utils/common/MsgRetrievingFunction.h index 90fc96b76751..693c3d8ba626 100644 --- a/src/utils/common/MsgRetrievingFunction.h +++ b/src/utils/common/MsgRetrievingFunction.h @@ -50,7 +50,11 @@ class MsgRetrievingFunction : public OutputDevice { MsgRetrievingFunction(T* object, Operation operation, MsgHandler::MsgType type) : myObject(object), myOperation(operation), - myMsgType(type) {} + myMsgType(type) { + /// @todo We should design a new formatter type for this. + myFormatter = new PlainXMLFormatter(); + myStreamDevice = new OStreamDevice(new std::ostringstream()); + } /// @brief Destructor @@ -58,27 +62,11 @@ class MsgRetrievingFunction : public OutputDevice { protected: - /// @name Methods that override/implement OutputDevice-methods - /// @{ - - /** @brief Returns the associated ostream - * - * The stream is an ostringstream, actually, into which the message - * is written. It is sent when postWriteHook is called. - * - * @return The used stream - * @see postWriteHook - */ - std::ostream& getOStream() { - return myMessage; - } - - /** @brief Sends the data which was written to the string stream via the retrieving function. */ virtual void postWriteHook() { - (myObject->*myOperation)(myMsgType, myMessage.str()); - myMessage.str(""); + (myObject->*myOperation)(myMsgType, getOStream().str()); + getOStream().str(""); } /// @} @@ -92,8 +80,4 @@ class MsgRetrievingFunction : public OutputDevice { /// @brief The type of message to retrieve. MsgHandler::MsgType myMsgType; - - /// @brief message buffer - std::ostringstream myMessage; - }; diff --git a/src/utils/gui/div/GUIMessageWindow.h b/src/utils/gui/div/GUIMessageWindow.h index 6d8f94e708b6..6193f7e1cf97 100644 --- a/src/utils/gui/div/GUIMessageWindow.h +++ b/src/utils/gui/div/GUIMessageWindow.h @@ -124,29 +124,26 @@ class GUIMessageWindow : public FXText { /// @brief constructor MsgOutputDevice(GUIMessageWindow* msgWindow, GUIEventType type) : myMsgWindow(msgWindow), - myType(type) { } + myType(type){ + /// @todo We should design a new formatter type for this. + myFormatter = new PlainXMLFormatter(); + myStreamDevice = new OStreamDevice(new std::ostringstream()); + } /// @brief destructor ~MsgOutputDevice() { } protected: - /// @brief get Output Stream - std::ostream& getOStream() { - return myStream; - } /// @brief write hook void postWriteHook() { - myMsgWindow->appendMsg(myType, myStream.str()); - myStream.str(""); + myMsgWindow->appendMsg(myType, getOStream().str()); + getOStream().str(""); } private: /// @brief pointer to message Windows GUIMessageWindow* myMsgWindow; - /// @brief output string stream - std::ostringstream myStream; - /// @brief type of event GUIEventType myType; }; diff --git a/src/utils/iodevices/CMakeLists.txt b/src/utils/iodevices/CMakeLists.txt index 5dd2ba85699f..3fc9620219bc 100644 --- a/src/utils/iodevices/CMakeLists.txt +++ b/src/utils/iodevices/CMakeLists.txt @@ -9,11 +9,15 @@ set(utils_iodevices_STAT_SRCS OutputDevice_File.h OutputDevice_String.cpp OutputDevice_String.h - OutputDevice_Network.cpp - OutputDevice_Network.h + # OutputDevice_Network.cpp + # OutputDevice_Network.h + OutputDevice_Parquet.cpp + OutputDevice_Parquet.h OutputFormatter.h PlainXMLFormatter.cpp PlainXMLFormatter.h + StreamDevices.h + ) add_library(utils_iodevices STATIC ${utils_iodevices_STAT_SRCS}) diff --git a/src/utils/iodevices/OutputDevice.cpp b/src/utils/iodevices/OutputDevice.cpp index 638bfaf05887..f5b473fe23a1 100644 --- a/src/utils/iodevices/OutputDevice.cpp +++ b/src/utils/iodevices/OutputDevice.cpp @@ -36,6 +36,7 @@ #include "OutputDevice_COUT.h" #include "OutputDevice_CERR.h" #include "OutputDevice_Network.h" +#include "OutputDevice_Parquet.h" #include "PlainXMLFormatter.h" #include #include @@ -77,17 +78,18 @@ OutputDevice::getDevice(const std::string& name, bool usePrefix) { } else if (name == "stderr") { dev = OutputDevice_CERR::getDevice(); } else if (FileHelpers::isSocket(name)) { - try { - const bool ipv6 = name[0] == '['; // IPv6 adresses may be written like '[::1]:8000' - const size_t sepIndex = name.find(":", ipv6 ? name.find("]") : 0); - const int port = StringUtils::toInt(name.substr(sepIndex + 1)); - dev = new OutputDevice_Network(ipv6 ? name.substr(1, sepIndex - 2) : name.substr(0, sepIndex), port); - } catch (NumberFormatException&) { - throw IOError("Given port number '" + name.substr(name.find(":") + 1) + "' is not numeric."); - } catch (EmptyData&) { - throw IOError(TL("No port number given.")); - } - } else { + // try { + // const bool ipv6 = name[0] == '['; // IPv6 adresses may be written like '[::1]:8000' + // const size_t sepIndex = name.find(":", ipv6 ? name.find("]") : 0); + // const int port = StringUtils::toInt(name.substr(sepIndex + 1)); + // dev = new OutputDevice_Network(ipv6 ? name.substr(1, sepIndex - 2) : name.substr(0, sepIndex), port); + // } catch (NumberFormatException&) { + // throw IOError("Given port number '" + name.substr(name.find(":") + 1) + "' is not numeric."); + // } catch (EmptyData&) { + throw IOError(TL("No port number given.")); + // } + } + else { std::string name2 = (name == "nul" || name == "NUL") ? "/dev/null" : name; if (usePrefix && OptionsCont::getOptions().isSet("output-prefix") && name2 != "/dev/null") { std::string prefix = OptionsCont::getOptions().getString("output-prefix"); @@ -102,11 +104,23 @@ OutputDevice::getDevice(const std::string& name, bool usePrefix) { name2 = FileHelpers::prependToLastPathComponent(prefix, name); } name2 = StringUtils::substituteEnvironment(name2, &OptionsIO::getLoadTime()); + // check the file extension + const auto file_ext = FileHelpers::getExtension(name); const int len = (int)name.length(); - dev = new OutputDevice_File(name2, len > 3 && name.substr(len - 3) == ".gz"); + if (file_ext == ".parquet" || file_ext == ".prq") { +#ifdef HAVE_PARQUET + dev = new OutputDevice_Parquet(name2); +#else + throw IOError(TL("Parquet output is not supported in this build.")); +#endif + } + else { + dev = new OutputDevice_File(name2, len > 3 && FileHelpers::getExtension(name) == ".gz"); + } } + // todo: extract this to a class method? (b.c. Parquet doesn't have an iostream) dev->setPrecision(); - dev->getOStream() << std::setiosflags(std::ios::fixed); + dev->setOSFlags(std::ios::fixed); myOutputDevices[name] = dev; return *dev; } @@ -207,9 +221,8 @@ OutputDevice::OutputDevice(const int defaultIndentation, const std::string& file myFilename(filename), myFormatter(new PlainXMLFormatter(defaultIndentation)) { } - -OutputDevice::~OutputDevice() { - delete myFormatter; +OutputDevice::OutputDevice(const std::string& filename, OutputFormatter* formatter) : + myFilename(filename), myFormatter(formatter) { } @@ -240,13 +253,13 @@ OutputDevice::close() { void OutputDevice::setPrecision(int precision) { - getOStream() << std::setprecision(precision); + getOStream().setPrecision(precision); } int OutputDevice::precision() { - return (int)getOStream().precision(); + return getOStream().precision(); } diff --git a/src/utils/iodevices/OutputDevice.h b/src/utils/iodevices/OutputDevice.h index 6029f27731d8..2a08aaf88a67 100644 --- a/src/utils/iodevices/OutputDevice.h +++ b/src/utils/iodevices/OutputDevice.h @@ -22,6 +22,7 @@ /****************************************************************************/ #pragma once #include +#include #include #include @@ -29,16 +30,24 @@ #include #include #include "PlainXMLFormatter.h" +#include "ParquetFormatter.h" +#include "StreamDevices.h" // =========================================================================== // class definitions // =========================================================================== +// Create an ENUM of the different writers (XML and Parquet) +enum class OutputWriterType { + XML, + PARQUET +}; + /** * @class OutputDevice * @brief Static storage of an output device and its base (abstract) implementation * - * OutputDevices are basically a capsule around an std::ostream, which give a + * OutputDevices are basically a capsule around an StreamDevice, which give a * unified access to sockets, files and stdout. * * Usually, an application builds as many output devices as needed. Each @@ -95,8 +104,8 @@ class OutputDevice { * @exception IOError If the output could not be built for any reason (error message is supplied) */ static bool createDeviceByOption(const std::string& optionName, - const std::string& rootElement = "", - const std::string& schemaFile = ""); + const std::string& rootElement = "", + const std::string& schemaFile = ""); /** @brief Returns the device described by the option @@ -139,9 +148,12 @@ class OutputDevice { /// @brief Constructor OutputDevice(const int defaultIndentation = 0, const std::string& filename = ""); + /// @brief Constructor + OutputDevice(const std::string& filename, OutputFormatter* formatter); + /// @brief Destructor - virtual ~OutputDevice(); + virtual ~OutputDevice() = default; /** @brief returns the information whether one can write into the device @@ -167,7 +179,7 @@ class OutputDevice { /** @brief Sets the precision or resets it to default * @param[in] precision The accuracy (number of digits behind '.') to set */ - void setPrecision(int precision = gPrecision); + virtual void setPrecision(int precision = gPrecision); /// @brief return precision set on the device int precision(); @@ -190,14 +202,14 @@ class OutputDevice { * @todo Describe what is saved */ bool writeXMLHeader(const std::string& rootElement, - const std::string& schemaFile, - std::map attrs = std::map(), - bool includeConfig = true); + const std::string& schemaFile, + std::map attrs = std::map(), + bool includeConfig = true); template bool writeHeader(const SumoXMLTag& rootElement) { - return static_cast(myFormatter)->writeHeader(getOStream(), rootElement); + return getFormatter().writeHeader(getOStream(), rootElement); } @@ -210,7 +222,7 @@ class OutputDevice { * @param[in] xmlElement Name of element to open * @return The OutputDevice for further processing */ - OutputDevice& openTag(const std::string& xmlElement); + virtual OutputDevice& openTag(const std::string& xmlElement); /** @brief Opens an XML tag @@ -220,7 +232,7 @@ class OutputDevice { * @param[in] xmlElement Id of the element to open * @return The OutputDevice for further processing */ - OutputDevice& openTag(const SumoXMLTag& xmlElement); + virtual OutputDevice& openTag(const SumoXMLTag& xmlElement); /** @brief Closes the most recently opened tag and optionally adds a comment @@ -233,7 +245,7 @@ class OutputDevice { * @return Whether a further element existed in the stack and could be closed * @todo it is not verified that the topmost element was closed */ - bool closeTag(const std::string& comment = ""); + virtual bool closeTag(const std::string& comment = ""); @@ -252,8 +264,7 @@ class OutputDevice { */ template OutputDevice& writeAttr(const SumoXMLAttr attr, const T& val) { - PlainXMLFormatter::writeAttr(getOStream(), attr, val); - return *this; + return writeAttr(toString(attr), val); } inline bool useAttribute(const SumoXMLAttr attr, SumoXMLAttrMask attributeMask) const { @@ -271,7 +282,7 @@ class OutputDevice { OutputDevice& writeOptionalAttr(const SumoXMLAttr attr, const T& val, long long int attributeMask) { assert((int)attr <= 63); if (attributeMask == 0 || useAttribute(attr, attributeMask)) { - PlainXMLFormatter::writeAttr(getOStream(), attr, val); + writeAttr(attr, val); } return *this; } @@ -279,7 +290,7 @@ class OutputDevice { OutputDevice& writeOptionalAttr(const SumoXMLAttr attr, const T& val, SumoXMLAttrMask attributeMask) { assert((int)attr <= (int)attributeMask.size()); if (attributeMask.none() || useAttribute(attr, attributeMask)) { - PlainXMLFormatter::writeAttr(getOStream(), attr, val); + writeAttr(attr, val); } return *this; } @@ -293,7 +304,24 @@ class OutputDevice { */ template OutputDevice& writeAttr(const std::string& attr, const T& val) { - PlainXMLFormatter::writeAttr(getOStream(), attr, val); + switch (this->getType()) + { + case OutputWriterType::XML: + // cast the writer to the correct type + getFormatter().writeAttr(getOStream(), attr, val); + break; + case OutputWriterType::PARQUET: +#ifdef HAVE_PARQUET + // cast the writer to the correct type + getFormatter().writeAttr(getOStream(), attr, val); +#else + throw IOError("Parquet output is not supported in this build. Please recompile with the correct options."); +#endif + break; + default: + throw IOError("Unknown output writer type"); + break; + } return *this; } @@ -341,7 +369,14 @@ class OutputDevice { */ template OutputDevice& operator<<(const T& t) { - getOStream() << t; + // getOStream() << t; + // get the correct formatter + if (this->getOStream().allowRaw()) { + writeRaw(t); + } + else { + throw IOError("Raw output is not allowed for this output device"); + } postWriteHook(); return *this; } @@ -354,17 +389,51 @@ class OutputDevice { return myFormatter->wroteHeader(); } + /// @todo should move to the formatter + /// @brief Returns the type of the output device + virtual void setOSFlags(std::ios_base::fmtflags flags) { + getOStream().setOSFlags(flags); + } + + // @brief handle the raw write + template + void writeRaw(const T& val) { + // cast the writer to the correct type + if (this->getType() == OutputWriterType::XML) { + getFormatter().writeRaw(getOStream(), val); + } + else { + throw IOError("Raw output is not supported for this output type"); + } + } + protected: /// @brief Returns the associated ostream - virtual std::ostream& getOStream() = 0; + virtual StreamDevice& getOStream() { + return *myStreamDevice; + }; + /// @brief Returns the associated ostream + template + T* getStreamDevice() { + return static_cast(myStreamDevice); + } + /// @brief Returns whether the output device is a parquet + virtual OutputWriterType getType() const { + return OutputWriterType::XML; + } /** @brief Called after every write access. * * Default implementation does nothing. */ virtual void postWriteHook(); + /// @brief Returns the formatter + OutputFormatter& getFormatter() { + return *myFormatter; + } + private: /// @brief map from names to output devices @@ -376,9 +445,17 @@ class OutputDevice { protected: const std::string myFilename; -private: + /// @brief the stream device + std::unique_ptr myStreamDevice{nullptr}; + /// @brief The formatter for XML - OutputFormatter* const myFormatter; + std::unique_ptr myFormatter{nullptr}; + + /// @brief return a type casted formatter + template + T& getFormatter() { + return static_cast(*myFormatter); + } private: /// @brief Invalidated copy constructor. @@ -386,5 +463,4 @@ class OutputDevice { /// @brief Invalidated assignment operator. OutputDevice& operator=(const OutputDevice&) = delete; - }; diff --git a/src/utils/iodevices/OutputDevice_CERR.cpp b/src/utils/iodevices/OutputDevice_CERR.cpp index 7174694e5003..9a26749194d2 100644 --- a/src/utils/iodevices/OutputDevice_CERR.cpp +++ b/src/utils/iodevices/OutputDevice_CERR.cpp @@ -47,6 +47,7 @@ OutputDevice_CERR::getDevice() { // method definitions // =========================================================================== OutputDevice_CERR::OutputDevice_CERR() : OutputDevice(0, "CERR") { + myStreamDevice = std::make_unique(std::cerr); } @@ -55,15 +56,10 @@ OutputDevice_CERR::~OutputDevice_CERR() { } -std::ostream& -OutputDevice_CERR::getOStream() { - return std::cerr; -} - - void OutputDevice_CERR::postWriteHook() { std::cerr.flush(); + getOStream().flush(); } diff --git a/src/utils/iodevices/OutputDevice_CERR.h b/src/utils/iodevices/OutputDevice_CERR.h index 4d5e2062c0d9..28700671fb16 100644 --- a/src/utils/iodevices/OutputDevice_CERR.h +++ b/src/utils/iodevices/OutputDevice_CERR.h @@ -41,20 +41,11 @@ class OutputDevice_CERR : public OutputDevice { protected: - /// @name Methods that override/implement OutputDevice-methods - /// @{ - - /** @brief Returns the associated ostream - * @return cerr - */ - std::ostream& getOStream(); - - /** @brief Called after every write access. * * Calls flush on stderr. */ - virtual void postWriteHook(); + virtual void postWriteHook() override; /// @} @@ -67,11 +58,10 @@ class OutputDevice_CERR : public OutputDevice { /// @brief Destructor ~OutputDevice_CERR(); - + private: /// @brief my singular instance static OutputDevice* myInstance; - }; diff --git a/src/utils/iodevices/OutputDevice_COUT.cpp b/src/utils/iodevices/OutputDevice_COUT.cpp index a6ff12d5f26d..4702961143f2 100644 --- a/src/utils/iodevices/OutputDevice_COUT.cpp +++ b/src/utils/iodevices/OutputDevice_COUT.cpp @@ -47,6 +47,7 @@ OutputDevice_COUT::getDevice() { // method definitions // =========================================================================== OutputDevice_COUT::OutputDevice_COUT() : OutputDevice(0, "COUT") { + myStreamDevice = std::make_unique(); } @@ -54,16 +55,10 @@ OutputDevice_COUT::~OutputDevice_COUT() { myInstance = nullptr; } - -std::ostream& -OutputDevice_COUT::getOStream() { - return std::cout; -} - - void OutputDevice_COUT::postWriteHook() { - std::cout.flush(); + myStreamDevice->flush(); + getOStream().flush(); } diff --git a/src/utils/iodevices/OutputDevice_COUT.h b/src/utils/iodevices/OutputDevice_COUT.h index 1bf862d877e9..a7230ed20a27 100644 --- a/src/utils/iodevices/OutputDevice_COUT.h +++ b/src/utils/iodevices/OutputDevice_COUT.h @@ -44,17 +44,11 @@ class OutputDevice_COUT : public OutputDevice { /// @name Methods that override/implement OutputDevice-methods /// @{ - /** @brief Returns the associated ostream - * @return cout - */ - std::ostream& getOStream(); - - /** @brief Called after every write access. * * Calls flush on stdout. */ - virtual void postWriteHook(); + virtual void postWriteHook() override; /// @} @@ -73,5 +67,4 @@ class OutputDevice_COUT : public OutputDevice { /// @brief my singular instance static OutputDevice* myInstance; - }; diff --git a/src/utils/iodevices/OutputDevice_File.cpp b/src/utils/iodevices/OutputDevice_File.cpp index 7318dfdc80f4..8bf5b6823f3a 100644 --- a/src/utils/iodevices/OutputDevice_File.cpp +++ b/src/utils/iodevices/OutputDevice_File.cpp @@ -40,7 +40,7 @@ OutputDevice_File::OutputDevice_File(const std::string& fullName, const bool com if (fullName == "/dev/null") { myAmNull = true; #ifdef WIN32 - myFileStream = new std::ofstream("NUL"); + myStreamDevice = std::make_unique(new std::ofstream("NUL")); if (!myFileStream->good()) { delete myFileStream; throw IOError(TLF("Could not redirect to NUL device (%).", std::string(std::strerror(errno)))); @@ -52,35 +52,22 @@ OutputDevice_File::OutputDevice_File(const std::string& fullName, const bool com #ifdef HAVE_ZLIB if (compressed) { try { - myFileStream = new zstr::ofstream(localName.c_str(), std::ios_base::out); + myStreamDevice = std::make_unique(new zstr::ofstream(localName.c_str(), std::ios_base::out)); } catch (strict_fstream::Exception& e) { throw IOError("Could not build output file '" + fullName + "' (" + e.what() + ")."); } catch (zstr::Exception& e) { throw IOError("Could not build output file '" + fullName + "' (" + e.what() + ")."); } } else { - myFileStream = new std::ofstream(localName.c_str(), std::ios_base::out); + myStreamDevice = std::make_unique(new std::ofstream(localName.c_str(), std::ios_base::out)); } #else UNUSED_PARAMETER(compressed); - myFileStream = new std::ofstream(localName.c_str(), std::ios_base::out); + myStreamDevice = std::make_unique(new std::ofstream(localName.c_str(), std::ios_base::out)); #endif - if (!myFileStream->good()) { - delete myFileStream; + if (!myStreamDevice->good()) { + myStreamDevice.reset(); throw IOError("Could not build output file '" + fullName + "' (" + std::strerror(errno) + ")."); } } - - -OutputDevice_File::~OutputDevice_File() { - delete myFileStream; -} - - -std::ostream& -OutputDevice_File::getOStream() { - return *myFileStream; -} - - /****************************************************************************/ diff --git a/src/utils/iodevices/OutputDevice_File.h b/src/utils/iodevices/OutputDevice_File.h index cc3ec46cfea4..cdc9a237bc28 100644 --- a/src/utils/iodevices/OutputDevice_File.h +++ b/src/utils/iodevices/OutputDevice_File.h @@ -23,6 +23,7 @@ #include #include "OutputDevice.h" +#include "StreamDevices.h" // =========================================================================== @@ -44,10 +45,6 @@ class OutputDevice_File : public OutputDevice { */ OutputDevice_File(const std::string& fullName, const bool compressed = false); - - /// @brief Destructor - ~OutputDevice_File(); - /** @brief returns the information whether the device will discard all output * @return Whether the device redirects to /dev/null */ @@ -55,22 +52,7 @@ class OutputDevice_File : public OutputDevice { return myAmNull; } - -protected: - /// @name Methods that override/implement OutputDevice-methods - /// @{ - - /** @brief Returns the associated ostream - * @return The used stream - */ - std::ostream& getOStream() override; - /// @} - - private: - /// The wrapped ofstream - std::ostream* myFileStream = nullptr; - /// am I redirecting to /dev/null bool myAmNull = false; diff --git a/src/utils/iodevices/OutputDevice_Network.cpp b/src/utils/iodevices/OutputDevice_Network.cpp index 22227f969b0d..65987aac267a 100644 --- a/src/utils/iodevices/OutputDevice_Network.cpp +++ b/src/utils/iodevices/OutputDevice_Network.cpp @@ -51,6 +51,7 @@ OutputDevice_Network::OutputDevice_Network(const std::string& host, std::this_thread::sleep_for(std::chrono::seconds(wait)); } } + myStreamDevice = new StringStream(); } @@ -60,16 +61,10 @@ OutputDevice_Network::~OutputDevice_Network() { } -std::ostream& -OutputDevice_Network::getOStream() { - return myMessage; -} - - void OutputDevice_Network::postWriteHook() { - const std::string toSend = myMessage.str(); - myMessage.str(""); + const std::string toSend = myStreamDevice->str(); + myStreamDevice->str(""); if (toSend.empty() || !mySocket->has_client_connection()) { return; } diff --git a/src/utils/iodevices/OutputDevice_Network.h b/src/utils/iodevices/OutputDevice_Network.h index d7c7ba9feb06..a75253aeaede 100644 --- a/src/utils/iodevices/OutputDevice_Network.h +++ b/src/utils/iodevices/OutputDevice_Network.h @@ -63,18 +63,6 @@ class OutputDevice_Network : public OutputDevice { protected: /// @name Methods that override/implement OutputDevice-methods /// @{ - - /** @brief Returns the associated ostream - * - * The stream is an ostringstream, actually, into which the message - * is written. It is sent when postWriteHook is called. - * - * @return The used stream - * @see postWriteHook - */ - std::ostream& getOStream(); - - /** @brief Sends the data which was written to the string stream over the socket. * * Converts the stored message into a vector of chars and sends them via to @@ -84,9 +72,6 @@ class OutputDevice_Network : public OutputDevice { /// @} private: - /// @brief packet buffer - std::ostringstream myMessage; - /// @brief the socket to transfer the data tcpip::Socket* mySocket; diff --git a/src/utils/iodevices/OutputDevice_Parquet.cpp b/src/utils/iodevices/OutputDevice_Parquet.cpp new file mode 100644 index 000000000000..a61ddd143734 --- /dev/null +++ b/src/utils/iodevices/OutputDevice_Parquet.cpp @@ -0,0 +1,100 @@ +/****************************************************************************/ +// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo +// Copyright (C) 2004-2024 German Aerospace Center (DLR) and others. +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0/ +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License 2.0 are satisfied: GNU General Public License, version 2 +// or later which is available at +// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later +/****************************************************************************/ +/// @file OutputDevice_Parquet.cpp +/// @author Daniel Krajzewicz +/// @author Michael Behrisch +/// @author Jakob Erdmann +/// @author Max Schrader +/// @date 2024 +/// +// An output device that encapsulates an Parquet file +/****************************************************************************/ +#include + +#ifdef HAVE_PARQUET + +#include +#include +#include +#include +#include + + +#include "OutputDevice_Parquet.h" + +#include +#include + +#include +#include +#include +#include + + +// =========================================================================== +// method definitions +// =========================================================================== +OutputDevice_Parquet::OutputDevice_Parquet(const std::string& fullName) + : OutputDevice(fullName, new ParquetFormatter()) { + // set the type of compression. TODO this should be based on the build options + builder.compression(parquet::Compression::ZSTD); +} + + +bool OutputDevice_Parquet::closeTag(const std::string& comment) { + UNUSED_PARAMETER(comment); + // open the file for writing, but only if the depth is >=2 (i.e. we are closing the children tag). + //! @todo this is a bit of a hack, but it works for now + auto formatter = dynamic_cast(&this->getFormatter()); + if (formatter->getDepth() < 2) { + // we have to clean up the stack, otherwise the file will not be written correctly + // when it is open + formatter->clearStack(); + // this is critical for the file to be written correctly + return false; + } + if (myFile == nullptr) { + if (formatter == nullptr) { + throw IOError("Formatter is not a ParquetFormatter"); + } + // Create a Parquet file + PARQUET_ASSIGN_OR_THROW( + this->myFile, arrow::io::FileOutputStream::Open(this->myFilename)); + + this->myStreamDevice = std::make_unique(parquet::ParquetFileWriter::Open(this->myFile, std::static_pointer_cast( + parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, formatter->getNodeVector()) + ), this->builder.build())); + + // check if the file was opened correctly + if (this->myFile->closed()) { + throw IOError("Could not build output file '" + this->myFullName + "' (" + std::strerror(errno) + ")."); + } + } + // now actually write the data + return formatter->closeTag(getOStream()); +} + + +OutputDevice_Parquet::~OutputDevice_Parquet() { + // have to delete the stream device before the file. This dumps unwritten data to the file + myStreamDevice.reset(); + // close the file (if open) + if (this->myFile.get() == nullptr) { + return; + } + [[maybe_unused]] arrow::Status status = this->myFile->Close(); +} + +#endif +/****************************************************************************/ diff --git a/src/utils/iodevices/OutputDevice_Parquet.h b/src/utils/iodevices/OutputDevice_Parquet.h new file mode 100644 index 000000000000..1358395260e6 --- /dev/null +++ b/src/utils/iodevices/OutputDevice_Parquet.h @@ -0,0 +1,105 @@ +/****************************************************************************/ +// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo +// Copyright (C) 2004-2024 German Aerospace Center (DLR) and others. +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0/ +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License 2.0 are satisfied: GNU General Public License, version 2 +// or later which is available at +// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later +/****************************************************************************/ +/// @file OutputDevice_Parquet.cpp +/// @author Daniel Krajzewicz +/// @author Michael Behrisch +/// @author Jakob Erdmann +/// @author Max Schrader +/// @date 2024 +/// +// An output device that encapsulates an Parquet file +/****************************************************************************/ +#pragma once + +#include + +#ifdef HAVE_PARQUET + +#include +#include "OutputDevice.h" +#include "ParquetFormatter.h" + +#include +#include +#include +#include +#include + + + +/** + * @class OutputDevice_Parquet + * @brief An output device that encapsulates an parquet stream writer + * + * Please note that the device is responsible for the stream and deletes + * it (it should not be deleted elsewhere). + */ +class OutputDevice_Parquet : public OutputDevice { +public: + /** @brief Constructor + * @param[in] fullName The name of the output file to use + * @exception IOError Should not be thrown by this implementation + */ + OutputDevice_Parquet(const std::string& fullName); + + /// @brief Destructor + ~OutputDevice_Parquet() override; + + /** @brief implements the close tag logic. This is where the file is first opened and the schema is created. + * This exploits the fact that for *most* SUMO files, all the fields are present at the first close tag event. + */ + bool closeTag(const std::string& comment) override; + + /** @brief writes a line feed if applicable. overriden from the base class to do nothing + */ + void lf() {}; + + // null the setPrecision method + void setPrecision(int precision) override { + UNUSED_PARAMETER(precision); + }; + + void setOSFlags(std::ios_base::fmtflags flags) override { + UNUSED_PARAMETER(flags); + }; + +protected: + + /// @brief Returns whether the output device is a parquet + OutputWriterType getType() const override { + return OutputWriterType::PARQUET; + } + + /// do I allow optional attributes + bool allowOptionalAttributes = false; + +private: + /// The wrapped ofstream + std::shared_ptr myFile = nullptr; + // the builder for the writer properties + parquet::WriterProperties::Builder builder; + // the schema + std::shared_ptr schema; + + /// am I redirecting to /dev/null + bool myAmNull = false; + + /// my full name + std::string myFullName; + + parquet::schema::NodeVector myNodeVector; + +}; + +#endif // HAVE_PARQUET \ No newline at end of file diff --git a/src/utils/iodevices/OutputDevice_String.cpp b/src/utils/iodevices/OutputDevice_String.cpp index a4f1e3cb5d4d..a9719c119f15 100644 --- a/src/utils/iodevices/OutputDevice_String.cpp +++ b/src/utils/iodevices/OutputDevice_String.cpp @@ -29,25 +29,14 @@ // =========================================================================== OutputDevice_String::OutputDevice_String(const int defaultIndentation) : OutputDevice(defaultIndentation) { - setPrecision(); - myStream << std::setiosflags(std::ios::fixed); + myStreamDevice = std::make_unique(new std::ostringstream()); + myStreamDevice->setOSFlags(std::ios::fixed); + myStreamDevice->setPrecision(2); } - -OutputDevice_String::~OutputDevice_String() { -} - - std::string OutputDevice_String::getString() const { - return myStream.str(); + return myStreamDevice->str(); } - -std::ostream& -OutputDevice_String::getOStream() { - return myStream; -} - - /****************************************************************************/ diff --git a/src/utils/iodevices/OutputDevice_String.h b/src/utils/iodevices/OutputDevice_String.h index ee7fef776a86..3c6f0a20649e 100644 --- a/src/utils/iodevices/OutputDevice_String.h +++ b/src/utils/iodevices/OutputDevice_String.h @@ -41,29 +41,8 @@ class OutputDevice_String : public OutputDevice { */ OutputDevice_String(const int defaultIndentation = 0); - - /// @brief Destructor - ~OutputDevice_String(); - - /** @brief Returns the current content as a string * @return The content as string */ std::string getString() const; - -protected: - /// @name Methods that override/implement OutputDevice-methods - /// @{ - - /** @brief Returns the associated ostream - * @return The used stream - */ - std::ostream& getOStream(); - /// @} - - -private: - /// The wrapped ofstream - std::ostringstream myStream; - }; diff --git a/src/utils/iodevices/OutputFormatter.h b/src/utils/iodevices/OutputFormatter.h index a0c896513790..29f5c566a863 100644 --- a/src/utils/iodevices/OutputFormatter.h +++ b/src/utils/iodevices/OutputFormatter.h @@ -24,6 +24,7 @@ #include #include #include +#include "StreamDevices.h" // =========================================================================== @@ -49,7 +50,7 @@ class RGBColor; class OutputFormatter { public: /// @brief Destructor - virtual ~OutputFormatter() { } + virtual ~OutputFormatter() = default; /** @brief Writes an XML header with optional configuration @@ -63,10 +64,19 @@ class OutputFormatter { * @todo Check which parameter is used herein * @todo Describe what is saved */ - virtual bool writeXMLHeader(std::ostream& into, const std::string& rootElement, + virtual bool writeXMLHeader(StreamDevice& into, const std::string& rootElement, const std::map& attrs, bool includeConfig = true) = 0; + /** @brief Writes an XML header + * + * + * @param[in] into The output stream to use + * @param[in] rootElement The root element to use + * @return Whether the header was written + */ + virtual bool writeHeader(StreamDevice& into, const SumoXMLTag& rootElement) = 0; + /** @brief Opens an XML tag * @@ -78,7 +88,7 @@ class OutputFormatter { * @param[in] xmlElement Name of element to open * @return The OutputDevice for further processing */ - virtual void openTag(std::ostream& into, const std::string& xmlElement) = 0; + virtual void openTag(StreamDevice& into, const std::string& xmlElement) = 0; /** @brief Opens an XML tag @@ -88,7 +98,7 @@ class OutputFormatter { * @param[in] into The output stream to use * @param[in] xmlElement Id of the element to open */ - virtual void openTag(std::ostream& into, const SumoXMLTag& xmlElement) = 0; + virtual void openTag(StreamDevice& into, const SumoXMLTag& xmlElement) = 0; /** @brief Closes the most recently opened tag and optinally add a comment @@ -97,11 +107,17 @@ class OutputFormatter { * @return Whether a further element existed in the stack and could be closed * @todo it is not verified that the topmost element was closed */ - virtual bool closeTag(std::ostream& into, const std::string& comment = "") = 0; + virtual bool closeTag(StreamDevice& into, const std::string& comment = "") = 0; - virtual void writePreformattedTag(std::ostream& into, const std::string& val) = 0; + virtual void writePreformattedTag(StreamDevice& into, const std::string& val) = 0; - virtual void writePadding(std::ostream& into, const std::string& val) = 0; + virtual void writePadding(StreamDevice& into, const std::string& val) = 0; virtual bool wroteHeader() const = 0; + + template + void writeRaw(StreamDevice& into, const T& val){ + + into << val; + } }; diff --git a/src/utils/iodevices/ParquetFormatter.h b/src/utils/iodevices/ParquetFormatter.h new file mode 100644 index 000000000000..641e22d73683 --- /dev/null +++ b/src/utils/iodevices/ParquetFormatter.h @@ -0,0 +1,468 @@ +/****************************************************************************/ +// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo +// Copyright (C) 2012-2024 German Aerospace Center (DLR) and others. +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0/ +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License 2.0 are satisfied: GNU General Public License, version 2 +// or later which is available at +// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later +/****************************************************************************/ +/// @file PlainXMLFormatter.h +/// @author Daniel Krajzewicz +/// @author Michael Behrisch +/// @date 2012 +/// +// Output formatter for Parquet output +/****************************************************************************/ +#pragma once +#include + +#ifdef HAVE_PARQUET +// parquet-cpp +#include +#include + +#include +#include +#include + +#include "OutputFormatter.h" +#include +#include "StreamDevices.h" + +#define PARQUET_TESTING + + +// Helper function to determine if a type is a fixed-length character array +template +struct is_fixed_char_array : std::false_type {}; + +template +struct is_fixed_char_array : std::true_type {}; + +// Helper template for the static_assert +template +constexpr bool always_false = false; + +// Overloaded function for different types +template +void AppendField(parquet::schema::NodeVector& fields, const T& val, const std::string& field_name) { + UNUSED_PARAMETER(val); + if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::OPTIONAL, parquet::Type::BYTE_ARRAY, + parquet::ConvertedType::UTF8)); + } + else if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::REQUIRED, parquet::Type::FIXED_LEN_BYTE_ARRAY, + parquet::ConvertedType::NONE, 1)); + } + else if constexpr (is_fixed_char_array::value) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::REQUIRED, parquet::Type::FIXED_LEN_BYTE_ARRAY, + parquet::ConvertedType::NONE, sizeof(T))); + } + else if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::REQUIRED, parquet::Type::INT32, + parquet::ConvertedType::INT_8)); + } + else if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::REQUIRED, parquet::Type::INT32, + parquet::ConvertedType::UINT_16)); + } + else if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::REQUIRED, parquet::Type::INT32, + parquet::ConvertedType::INT_32)); + } + else if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::OPTIONAL, parquet::Type::INT64, + parquet::ConvertedType::UINT_64)); + } + else if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::REQUIRED, parquet::Type::DOUBLE, + parquet::ConvertedType::NONE)); + } + else if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::REQUIRED, parquet::Type::INT64, + parquet::ConvertedType::TIMESTAMP_MICROS)); + } + else if constexpr (std::is_same_v) { + fields.push_back(parquet::schema::PrimitiveNode::Make( + field_name, parquet::Repetition::REQUIRED, parquet::Type::INT64, + parquet::ConvertedType::TIMESTAMP_MILLIS)); + } + else { + // // warn + // fmt::print("Unsupported type for AppendField\n"); + // static_assert(always_false, "Unsupported type for AppendField"); + + } + // else { + // static_assert(always_false, "Unsupported type for AppendField"); + // } +} + +// =========================================================================== +// class definitions +// =========================================================================== +/** + * @class TypedAttribute + * @brief A class to represent an attribute with a specific type + * + * This class is used to represent an attribute of an XML XMLElement with a specific type. + */ + // Base class +class AttributeBase { +public: + AttributeBase(std::string name) : name_(std::move(name)) {} + virtual ~AttributeBase() = default; + + const std::string& getName() const { return name_; } + + // Pure virtual function for printing + virtual void print(StreamDevice& os) const = 0; + +private: + std::string name_; +}; + +// Helper function to convert various types to Parquet-compatible types +template +auto convertToParquetType(const T& value) { + if constexpr (std::is_same_v) { + if constexpr (sizeof(unsigned long) <= sizeof(uint32_t)) { + return static_cast(value); + } else { + return static_cast(value); + } + } else if constexpr (std::is_same_v) { + return value; + } else if constexpr (std::is_integral_v) { + if constexpr (std::is_signed_v) { + if constexpr (sizeof(T) <= 1) return static_cast(value); + else if constexpr (sizeof(T) <= 2) return static_cast(value); + else if constexpr (sizeof(T) <= 4) return static_cast(value); + else return static_cast(value); + } else { + if constexpr (sizeof(T) <= 1) return static_cast(value); + else if constexpr (sizeof(T) <= 2) return static_cast(value); + else if constexpr (sizeof(T) <= 4) return static_cast(value); + else return static_cast(value); + } + } else if constexpr (std::is_floating_point_v) { + if constexpr (sizeof(T) <= 4) return static_cast(value); + else return static_cast(value); + } else if constexpr (std::is_same_v || + std::is_same_v) { + return value; + } else if constexpr (std::is_same_v) { + return value; + } else if constexpr (std::is_array_v) { + // try the toString function + return toString(value); + } else if constexpr (std::is_same_v || + std::is_same_v || + std::is_same_v) { + // have to take a copy of the string, to ensure its lifetime is long enough + return std::string(value); + } else { + // For any other type, convert to string + return toString(value); + } +} + +template +class Attribute : public AttributeBase { +public: + Attribute(const std::string& name, const T& value) + : AttributeBase(name), value_(convertToParquetType(value)) {} + + void print(StreamDevice& os) const override { + if (value_){ + os << *value_; + } else{ + assert(false); + } + } + +private: + std::optional()))> value_; +}; + + +class XMLElement { +public: + /// @brief Constructor + explicit XMLElement(std::string name) : myName(std::move(name)), beenWritten(false) {} + + /// @brief Destructor + virtual ~XMLElement() = default; + + /// @brief Move constructor + XMLElement(XMLElement&& other) noexcept = default; + + + /// @brief Move assignment operator + XMLElement& operator=(XMLElement&& other) noexcept = default; + + /// @brief Add an attribute to the XMLElement + /// @param attr The attribute to add + void addAttribute(std::unique_ptr attr) { + myAttributes.push_back(std::move(attr)); + } + + // define a comparison operator (just checks the name) + bool operator==(const XMLElement& other) const { + return myName == other.myName; + } + + // define a comparison operator (just checks the name) + bool operator==(const std::string& other) const { + return myName == other; + } + + /// @brief a method to write the XMLElement to a stream using the << operator + friend StreamDevice& operator<<(StreamDevice& into, const XMLElement& elem) { + for (const auto& attr : elem.myAttributes) { + attr->print(into); + } + return into; + } + + /// @brief a method to check whether the XMLElement has been written + bool written() const { + return beenWritten; + } + + /// @brief a method to set the XMLElement as written + void setWritten() { + beenWritten = true; + } + + /// @brief get the attributes + const std::vector>& getAttributes() const { + return myAttributes; + } + + /// @brief get the name of the element + const std::string& getName() const { + return myName; + } + +protected: + /// @brief The name of the XMLElement + std::string myName; + + /// @brief stores whether the XMLElement has been written + bool beenWritten; + + /// @brief a store for the attributes + std::vector> myAttributes; +}; + +/** + * @class PlainXMLFormatter + * @brief Output formatter for plain XML output + * + * PlainXMLFormatter format XML like output into the output stream. + */ +class ParquetFormatter : public OutputFormatter { +public: + /// @brief Constructor + ParquetFormatter() {}; + + /// @brief Destructor + virtual ~ParquetFormatter() = default; + + /** @brief Writes an XML header with optional configuration + * + * If something has been written (myXMLStack is not empty), nothing + * is written and false returned. + * + * @param[in] into The output stream to use + * @param[in] rootXMLElement The root XMLElement to use + * @param[in] attrs Additional attributes to save within the rootXMLElement + * @todo Describe what is saved + */ + // turn off the warning for unused parameters + bool writeXMLHeader(StreamDevice& into, const std::string& rootXMLElement, + const std::map& attrs, + bool includeConfig = true) override { + UNUSED_PARAMETER(into); + UNUSED_PARAMETER(rootXMLElement); + UNUSED_PARAMETER(attrs); + UNUSED_PARAMETER(includeConfig); + return 0; + }; + + + /** @brief Opens an XML tag + * + * An indentation, depending on the current xml-XMLElement-stack size, is written followed + * by the given xml XMLElement ("<" + xmlXMLElement) + * The xml XMLElement is added to the stack, then. + * + * @param[in] into The output stream to use + * @param[in] xmlXMLElement Name of XMLElement to open + * @return The OutputDevice for further processing + */ + void openTag(StreamDevice& into, const std::string& xmlXMLElement) override { + UNUSED_PARAMETER(into); +#ifdef PARQUET_TESTING + // assert that the stack does not contain the XMLElement + assert(std::find(myXMLStack.begin(), myXMLStack.end(), xmlXMLElement) == myXMLStack.end()); +#endif + myXMLStack.push_back(XMLElement(xmlXMLElement)); + } + + /** @brief Opens an XML tag + * + * Helper method which finds the correct string before calling openTag. + * + * @param[in] into The output stream to use + * @param[in] xmlXMLElement Id of the XMLElement to open + */ + inline void openTag(StreamDevice& into, const SumoXMLTag& xmlXMLElement) override { + openTag(into, toString(xmlXMLElement)); + }; + + + /** @brief Closes the most recently opened tag + * + * @param[in] into The output stream to use + * @return Whether a further XMLElement existed in the stack and could be closed + * @todo it is not verified that the topmost XMLElement was closed + */ + inline bool closeTag(StreamDevice& into, const std::string& comment = "") override { + UNUSED_PARAMETER(comment); + if (myXMLStack.empty()) { + return false; + } + + // only check the last XMLElement + if (!myXMLStack.back().written()) { + for (auto& elem : myXMLStack) { + into << elem; + elem.setWritten(); + } + // close the row + into.endLine(); + } + // pop the last XMLElement and remove from memory + myXMLStack.pop_back(); + return false; + }; + + + /** @brief writes a preformatted tag to the device but ensures that any + * pending tags are closed + * @param[in] into The output stream to use + * @param[in] val The preformatted data + */ + void writePreformattedTag(StreamDevice& into, const std::string& val) override { + // don't take any action + UNUSED_PARAMETER(into); + UNUSED_PARAMETER(val); + return; + }; + + /** @brief writes arbitrary padding + */ + inline void writePadding(StreamDevice& into, const std::string& val) override { + UNUSED_PARAMETER(into); + UNUSED_PARAMETER(val); + }; + + + /** @brief writes an arbitrary attribute + * + * @param[in] into The output stream to use + * @param[in] attr The attribute (name) + * @param[in] val The attribute value + */ + template + void writeAttr(StreamDevice& into, const std::string& attr, const T& val) { + UNUSED_PARAMETER(into); + std::unique_ptr typed_attr = std::make_unique>(attr, val); + this->myXMLStack.back().addAttribute(std::move(typed_attr)); + if (!sharedNodeVector && this->fields.find(attr) == this->fields.end()) { + // add the field to the schema + AppendField(myNodeVector, val, attr); + this->fields.insert(attr); + } + } + + /** @brief returns the node vector + * @return const parquet::schema::NodeVector& + */ + inline const parquet::schema::NodeVector& getNodeVector() { + sharedNodeVector = true; + return myNodeVector; + } + + bool wroteHeader() const override { + return !myXMLStack.empty(); + } + + /** + * @brief Get the Stack object + * + * @return std::vector<_Tag *>& + */ + inline std::vector& getStack() { + return myXMLStack; + } + + /** + * @brief Write the header. (This has no effect for the ParquetFormatter) + * + * @param Return success + */ + inline bool writeHeader([[maybe_unused]] StreamDevice& into, [[maybe_unused]] const SumoXMLTag& rootElement) override { return true; }; + + + template + void writeRaw(StreamDevice& into, T& val) { + UNUSED_PARAMETER(into); + UNUSED_PARAMETER(val); + throw std::runtime_error("writeRaw not implemented for ParquetFormatter"); + } + + int getDepth() const { + return static_cast(myXMLStack.size()); + } + + void clearStack() { + myXMLStack.clear(); + myNodeVector.clear(); + fields.clear(); + } + + +private: + /// @brief The stack of begun xml XMLElements. + /// We don't need to store the full XMLElement, just the value + std::vector myXMLStack; + + /// @brief The parquet node vector + parquet::schema::NodeVector myNodeVector; + + /// @brief flag to determin if we have shared NodeVector + bool sharedNodeVector{false}; + + // @brief the set of unique fields + std::set fields; +}; +// =========================================================================== +#endif // HAVE_PARQUET \ No newline at end of file diff --git a/src/utils/iodevices/PlainXMLFormatter.cpp b/src/utils/iodevices/PlainXMLFormatter.cpp index 561b9c6ce81d..6cb27c6b9121 100644 --- a/src/utils/iodevices/PlainXMLFormatter.cpp +++ b/src/utils/iodevices/PlainXMLFormatter.cpp @@ -23,6 +23,7 @@ #include #include #include "PlainXMLFormatter.h" +#include "StreamDevices.h" // =========================================================================== @@ -34,7 +35,7 @@ PlainXMLFormatter::PlainXMLFormatter(const int defaultIndentation) bool -PlainXMLFormatter::writeHeader(std::ostream& into, const SumoXMLTag& rootElement) { +PlainXMLFormatter::writeHeader(StreamDevice& into, const SumoXMLTag& rootElement) { if (myXMLStack.empty()) { OptionsCont::getOptions().writeXMLHeader(into); openTag(into, rootElement); @@ -45,13 +46,13 @@ PlainXMLFormatter::writeHeader(std::ostream& into, const SumoXMLTag& rootElement bool -PlainXMLFormatter::writeXMLHeader(std::ostream& into, const std::string& rootElement, +PlainXMLFormatter::writeXMLHeader(StreamDevice& into, const std::string& rootElement, const std::map& attrs, bool includeConfig) { if (myXMLStack.empty()) { OptionsCont::getOptions().writeXMLHeader(into, includeConfig); openTag(into, rootElement); for (std::map::const_iterator it = attrs.begin(); it != attrs.end(); ++it) { - writeAttr(into, it->first, it->second); + writeAttr(into, toString(it->first), it->second); } into << ">\n"; myHavePendingOpener = false; @@ -62,24 +63,24 @@ PlainXMLFormatter::writeXMLHeader(std::ostream& into, const std::string& rootEle void -PlainXMLFormatter::openTag(std::ostream& into, const std::string& xmlElement) { +PlainXMLFormatter::openTag(StreamDevice& into, const std::string& xmlElement) { if (myHavePendingOpener) { into << ">\n"; } myHavePendingOpener = true; - into << std::string(4 * (myXMLStack.size() + myDefaultIndentation), ' ') << "<" << xmlElement; + into << std::string(4 * (myXMLStack.size() + myDefaultIndentation), ' ').c_str() << "<" << xmlElement; myXMLStack.push_back(xmlElement); } void -PlainXMLFormatter::openTag(std::ostream& into, const SumoXMLTag& xmlElement) { +PlainXMLFormatter::openTag(StreamDevice& into, const SumoXMLTag& xmlElement) { openTag(into, toString(xmlElement)); } bool -PlainXMLFormatter::closeTag(std::ostream& into, const std::string& comment) { +PlainXMLFormatter::closeTag(StreamDevice& into, const std::string& comment) { if (!myXMLStack.empty()) { if (myHavePendingOpener) { into << "/>" << comment << "\n"; @@ -96,7 +97,7 @@ PlainXMLFormatter::closeTag(std::ostream& into, const std::string& comment) { void -PlainXMLFormatter::writePreformattedTag(std::ostream& into, const std::string& val) { +PlainXMLFormatter::writePreformattedTag(StreamDevice& into, const std::string& val) { if (myHavePendingOpener) { into << ">\n"; myHavePendingOpener = false; @@ -105,7 +106,7 @@ PlainXMLFormatter::writePreformattedTag(std::ostream& into, const std::string& v } void -PlainXMLFormatter::writePadding(std::ostream& into, const std::string& val) { +PlainXMLFormatter::writePadding(StreamDevice& into, const std::string& val) { into << val; } diff --git a/src/utils/iodevices/PlainXMLFormatter.h b/src/utils/iodevices/PlainXMLFormatter.h index 885058a1cd52..33d920e5b0d1 100644 --- a/src/utils/iodevices/PlainXMLFormatter.h +++ b/src/utils/iodevices/PlainXMLFormatter.h @@ -26,6 +26,7 @@ #endif #include "OutputFormatter.h" +#include "StreamDevices.h" // =========================================================================== @@ -44,7 +45,7 @@ class PlainXMLFormatter : public OutputFormatter { /// @brief Destructor - virtual ~PlainXMLFormatter() { } + virtual ~PlainXMLFormatter() = default; /** @brief Writes an XML header with optional configuration @@ -57,7 +58,7 @@ class PlainXMLFormatter : public OutputFormatter { * @param[in] attrs Additional attributes to save within the rootElement * @todo Describe what is saved */ - bool writeXMLHeader(std::ostream& into, const std::string& rootElement, + bool writeXMLHeader(StreamDevice& into, const std::string& rootElement, const std::map& attrs, bool includeConfig = true); @@ -70,7 +71,7 @@ class PlainXMLFormatter : public OutputFormatter { * @param[in] into The output stream to use * @param[in] rootElement The root element to use */ - bool writeHeader(std::ostream& into, const SumoXMLTag& rootElement); + bool writeHeader(StreamDevice& into, const SumoXMLTag& rootElement); /** @brief Opens an XML tag @@ -83,7 +84,7 @@ class PlainXMLFormatter : public OutputFormatter { * @param[in] xmlElement Name of element to open * @return The OutputDevice for further processing */ - void openTag(std::ostream& into, const std::string& xmlElement); + void openTag(StreamDevice& into, const std::string& xmlElement); /** @brief Opens an XML tag @@ -93,7 +94,7 @@ class PlainXMLFormatter : public OutputFormatter { * @param[in] into The output stream to use * @param[in] xmlElement Id of the element to open */ - void openTag(std::ostream& into, const SumoXMLTag& xmlElement); + void openTag(StreamDevice& into, const SumoXMLTag& xmlElement); /** @brief Closes the most recently opened tag @@ -102,7 +103,7 @@ class PlainXMLFormatter : public OutputFormatter { * @return Whether a further element existed in the stack and could be closed * @todo it is not verified that the topmost element was closed */ - bool closeTag(std::ostream& into, const std::string& comment = ""); + bool closeTag(StreamDevice& into, const std::string& comment = ""); /** @brief writes a preformatted tag to the device but ensures that any @@ -110,40 +111,37 @@ class PlainXMLFormatter : public OutputFormatter { * @param[in] into The output stream to use * @param[in] val The preformatted data */ - void writePreformattedTag(std::ostream& into, const std::string& val); + void writePreformattedTag(StreamDevice& into, const std::string& val); /** @brief writes arbitrary padding */ - void writePadding(std::ostream& into, const std::string& val); + void writePadding(StreamDevice& into, const std::string& val); - - /** @brief writes an arbitrary attribute + /** @brief writes a named attribute * * @param[in] into The output stream to use * @param[in] attr The attribute (name) * @param[in] val The attribute value */ template - static void writeAttr(std::ostream& into, const std::string& attr, const T& val) { + void writeAttr(StreamDevice& into, const std::string& attr, const T& val) { into << " " << attr << "=\"" << toString(val, into.precision()) << "\""; } - - /** @brief writes a named attribute - * - * @param[in] into The output stream to use - * @param[in] attr The attribute (name) - * @param[in] val The attribute value - */ - template - static void writeAttr(std::ostream& into, const SumoXMLAttr attr, const T& val) { - into << " " << toString(attr) << "=\"" << toString(val, into.precision()) << "\""; + void writeAttr(StreamDevice& into, const std::string& attr, const double& val){ +#ifdef HAVE_FMT + fmt::print(into.getOStream(), " {}=\"{:.{}f}\"", attr, val, into.precision()); +#else + into << " " << attr << "=\"" << val << "\""; +#endif } bool wroteHeader() const { return !myXMLStack.empty(); } + + private: /// @brief The stack of begun xml elements std::vector myXMLStack; @@ -156,20 +154,20 @@ class PlainXMLFormatter : public OutputFormatter { }; -// =========================================================================== -// specialized template implementations (for speedup) -// =========================================================================== -template <> -inline void PlainXMLFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const double& val) { -#ifdef HAVE_FMT - fmt::print(into, " {}=\"{:.{}f}\"", toString(attr), val, into.precision()); -#else - into << " " << toString(attr) << "=\"" << val << "\""; -#endif -} +// // =========================================================================== +// // specialized template implementations (for speedup) +// // =========================================================================== +// template <> +// inline void PlainXMLFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const double& val) { +// #ifdef HAVE_FMT +// fmt::print(into, " {}=\"{:.{}f}\"", toString(attr), val, into.precision()); +// #else +// into << " " << toString(attr) << "=\"" << val << "\""; +// #endif +// } -template <> -inline void PlainXMLFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const std::string& val) { - into << " " << toString(attr) << "=\"" << val << "\""; -} +// template <> +// inline void PlainXMLFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const std::string& val) { +// into << " " << toString(attr) << "=\"" << val << "\""; +// } diff --git a/src/utils/iodevices/StreamDevices.h b/src/utils/iodevices/StreamDevices.h new file mode 100644 index 000000000000..d192415b9f25 --- /dev/null +++ b/src/utils/iodevices/StreamDevices.h @@ -0,0 +1,363 @@ + +#pragma once +#include +#include +#include +#include + +#ifdef HAVE_PARQUET +#include +#include + +#include +#include +#include +#endif + + +class StreamDevice { +public: + + /// @brief The type of the stream + enum Type { + OSTREAM, // std::ostream (or std::ofstream) + COUT, // std::cout + PARQUET // parquet::StreamWriter + }; + + // create a constructor that a type and raw write access + StreamDevice(Type type, bool access) : rawWriteAccess(access), myType(type) {}; + // create a default constructor + StreamDevice() = default; + + /// @brief Destructor + virtual ~StreamDevice() = default; + + /// @brief is the stream ok + /// @return true if the stream is ok + virtual bool ok() = 0; + + /// @brief flush the stream + /// @return this + virtual StreamDevice& flush() = 0; + + /// @brief close the stream + virtual void close() = 0; + + /// @brief is the stream good + /// @return true if the stream is good + virtual bool good() = 0; + + /// @brief read the stream into a string + /// @return the string + virtual std::string str() = 0; + + /// @brief set the precision + /// @param precision + virtual void setPrecision(int precision) = 0; + + /// @brief get the precision + /// @return the precision + virtual int precision() = 0; + + /// @brief implement a stream operator + virtual operator std::ostream& () = 0; + + /// @brief write a string to the stream + /// @param s the string to write + virtual void str(const std::string& s) = 0; + + /// @brief write an endline to the stream + /// @return this + virtual StreamDevice& endLine() = 0; + + /// @brief set the output stream flags + /// @param flags the flags to set + virtual void setOSFlags(std::ios_base::fmtflags flags) = 0; + + /// @brief get the type of the stream + virtual Type type() const { + return myType; + }; + + /// @brief allow raw output + bool allowRaw() const { + return rawWriteAccess; + } + + /// @brief define the behavior of a cast to std::ostream + virtual std::ostream& getOStream() { + throw std::runtime_error("Not implemented"); + } + +protected: + /// @brief allow raw write access + bool rawWriteAccess = false; + /// @brief the type of the stream + Type myType = Type::OSTREAM; + +}; + +class OStreamDevice : public StreamDevice { +public: + + // write a constructor that takes a std::ofstream + OStreamDevice(std::ofstream* stream) : StreamDevice(Type::OSTREAM, true), myStream(std::move(stream)) {} + OStreamDevice(std::ostream* stream) : StreamDevice(Type::OSTREAM, true), myStream(std::move(stream)) {} + OStreamDevice(std::ofstream stream) : StreamDevice(Type::OSTREAM, true), myStream(new std::ofstream(std::move(stream))) {} + OStreamDevice(std::basic_ostream stream) : StreamDevice(Type::OSTREAM, true), myStream(&stream) {} + + virtual ~OStreamDevice() override = default; + + bool ok() override { + return myStream->good(); + } + + StreamDevice& flush() override { + myStream->flush(); + return *this; + } + + void close() override { + myStream->flush(); + } + + template + StreamDevice& print(const T& t) { + (*myStream) << t; + return *this; + } + + void setPrecision(int precision) override { + (*myStream) << std::setprecision(precision); + } + + void setOSFlags(std::ios_base::fmtflags flags) override { + myStream->setf(flags); + } + + int precision() override { + return (int)myStream->precision(); + } + + bool good() override { + return myStream->good(); + } + + std::string str() override { + // Try casting to ostringstream + if (auto* oss_ptr = dynamic_cast(myStream.get())) { + return oss_ptr->str(); + } + + // Try casting to stringstream + if (auto* ss_ptr = dynamic_cast(myStream.get())) { + return ss_ptr->str(); + } + + // If it's neither, we need to use a more general approach + std::ostringstream oss; + oss << myStream->rdbuf(); + return oss.str(); + } + + void str(const std::string& s) override { + (*myStream) << s; + } + + operator std::ostream& () override { + return *myStream; + } + + StreamDevice& endLine() override { + (*myStream) << std::endl; + return *this; + } + + // get the type of the stream + Type type() const override { + return Type::OSTREAM; + } + + std::ostream& getOStream() override { + return *myStream; + } + +private: + std::unique_ptr myStream; +}; // Add the missing semicolon here + + +class COUTStreamDevice : public StreamDevice { +public: + + // write a constructor that takes a std::ofstream + COUTStreamDevice() : StreamDevice(Type::COUT, true), myStream(std::cout) {}; + COUTStreamDevice(std::ostream& stream) : StreamDevice(Type::COUT, true), myStream(stream) {}; + + virtual ~COUTStreamDevice() override = default; + + bool ok() override { + return myStream.good(); + } + + StreamDevice& flush() override { + myStream.flush(); + return *this; + } + + void close() override { + (void)(this->flush()); + } + + template + StreamDevice& print(const T& t) { + myStream << t; + return *this; + } + + void setPrecision(int precision) override { + myStream << std::setprecision(precision); + } + + void setOSFlags(std::ios_base::fmtflags flags) override { + myStream.setf(flags); + } + + int precision() override { + return static_cast(myStream.precision()); + } + + bool good() override { + return myStream.good(); + } + + std::string str() override { + return ""; + } + + void str(const std::string& s) override { + myStream << s; + } + + operator std::ostream& () override { + return myStream; + } + + StreamDevice& endLine() override { + myStream << std::endl; + return *this; + } + + std::ostream& getOStream() override { + return myStream; + } + +private: + + std::ostream& myStream; + +}; // Add the missing semicolon here + +class ParquetStream : public StreamDevice { + +#ifdef HAVE_PARQUET + +public: + + ParquetStream(std::unique_ptr file) : StreamDevice(Type::PARQUET, false) { + myStream = std::make_unique(std::move(file)); + }; + + virtual ~ParquetStream() = default; + + bool ok() override { + return true; + } + + bool good() override { + // check that the stream is not null + return myStream != nullptr; + } + + StreamDevice& flush() override { + // do nothing + return *this; + } + + void close() override { + myStream->EndRowGroup(); + myStream.release(); + } + + void setPrecision(int precision) override { + UNUSED_PARAMETER(precision); + } + + std::string str() override { + return ""; + } + + template + void print(const T& t) { + (*myStream) << t; + } + + // protect this to only allow types that are supported by parquet + + + void setOSFlags(std::ios_base::fmtflags flags) override {UNUSED_PARAMETER(flags);} + + operator std::ostream& () override { + throw std::runtime_error("Not implemented"); + } + + void str(const std::string& s) override { + UNUSED_PARAMETER(s); + throw std::runtime_error("Not implemented"); + }; + + StreamDevice& endLine() override { + myStream->EndRow(); + return *this; + } + + // get the type of the stream + Type type() const override { + return Type::PARQUET; + } + + int precision() override { + return 0; + } + +private: + std::unique_ptr myStream; + +#endif +}; + +// implement a templated stream operator. The base class does nothing +template +StreamDevice& operator<<(StreamDevice& stream, const T& t) { + switch (stream.type()) { + case StreamDevice::Type::OSTREAM: + static_cast(&stream)->print(t); + break; + case StreamDevice::Type::COUT: + static_cast(&stream)->print(t); + break; + case StreamDevice::Type::PARQUET: +#ifdef HAVE_PARQUET + static_cast(&stream)->print(t); + // throw std::runtime_error("Parquet not supported in this build"); +#else + throw std::runtime_error("Parquet not supported in this build"); +#endif + break; + default: + // assert that this does not happen + throw std::runtime_error("Unknown stream type in StreamDevice"); + } + return stream; +}