Skip to content

Commit

Permalink
New DAQ source development for phase-2 and scouting:
Browse files Browse the repository at this point in the history
- Based on Run2 source, DAQSource class with separate classes implementing data-format specifics.
- A general "FRD" like format is expected with a file header describing number of entries, and sequential entries describing either events or orbits.
- removing single-buffering support, file locking support (in source only)
- DaqProvenanceHelper adapted to support injection of other object types apart from FEDRawDataCollection and injection by a different source
- format-specific (model) in a separate source file and class
- add data type to FRD file header and refactor class definition of the header
- Update to FRD file header V2: header of 32 bytes, data type, including run number and 32-bit event counter
- dump scouting data module for testing
- scouting source support for multiple files by scouting models
  • Loading branch information
smorovic committed Mar 6, 2023
1 parent fd99ce3 commit a509731
Show file tree
Hide file tree
Showing 27 changed files with 3,534 additions and 118 deletions.
1 change: 1 addition & 0 deletions EventFilter/Utilities/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<use name="tbb"/>
<use name="DataFormats/FEDRawData"/>
<use name="DataFormats/TCDS"/>
<use name="DataFormats/L1Trigger"/>
<use name="FWCore/Framework"/>
<use name="FWCore/MessageLogger"/>
<use name="FWCore/ServiceRegistry"/>
Expand Down
209 changes: 209 additions & 0 deletions EventFilter/Utilities/interface/DAQSource.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
#ifndef EventFilter_Utilities_DAQSource_h
#define EventFilter_Utilities_DAQSource_h

#include <condition_variable>
#include <cstdio>
#include <filesystem>
#include <memory>
#include <mutex>
#include <thread>

#include "oneapi/tbb/concurrent_queue.h"
#include "oneapi/tbb/concurrent_vector.h"

#include "FWCore/Sources/interface/RawInputSource.h"
#include "FWCore/Framework/interface/EventPrincipal.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "DataFormats/Provenance/interface/ProcessHistoryID.h"
#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"

#include "EventFilter/Utilities/interface/EvFDaqDirector.h"

//import InputChunk
#include "EventFilter/Utilities/interface/FedRawDataInputSource.h"


class FEDRawDataCollection;
class InputSourceDescription;
class ParameterSet;

struct RawInputFile;
//struct InputChunk;
class DataMode;

class DataModeFRD;

namespace evf {
class FastMonitoringService;
namespace FastMonState {
enum InputState : short;
}
} // namespace evf

class DAQSource : public edm::RawInputSource {
friend struct RawInputFile;
friend struct InputChunk;

public:
explicit DAQSource(edm::ParameterSet const&, edm::InputSourceDescription const&);
~DAQSource() override;
static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);

std::pair<bool, unsigned int> getEventReport(unsigned int lumi, bool erase);
bool useL1EventID() const { return useL1EventID_; }
int currentLumiSection() const { return currentLumiSection_; }
int eventRunNumber() const { return eventRunNumber_; }
void makeEventWrapper(edm::EventPrincipal& eventPrincipal, edm::EventAuxiliary& aux) {
makeEvent(eventPrincipal, aux);
}
bool fileListLoopMode() { return fileListLoopMode_; }

edm::ProcessHistoryID& processHistoryID() { return processHistoryID_; }

protected:
Next checkNext() override;
void read(edm::EventPrincipal& eventPrincipal) override;
void setMonState(evf::FastMonState::InputState state);
void setMonStateSup(evf::FastMonState::InputState state);

private:
void rewind_() override;
inline evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock();
inline evf::EvFDaqDirector::FileStatus getNextDataBlock();

void maybeOpenNewLumiSection(const uint32_t lumiSection);

void readSupervisor();
void dataArranger();
void readWorker(unsigned int tid);
void threadError();
bool exceptionState() { return setExceptionState_; }

//monitoring
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events);

long initFileList();
evf::EvFDaqDirector::FileStatus getFile(unsigned int& ls,
std::string& nextFile,
uint32_t& fsize,
uint64_t& lockWaitTime);

//variables
evf::FastMonitoringService* fms_ = nullptr;
evf::EvFDaqDirector* daqDirector_ = nullptr;

const std::string dataModeConfig_;
unsigned int eventChunkSize_; // for buffered read-ahead
unsigned int eventChunkBlock_; // how much read(2) asks at the time
unsigned int readBlocks_;
unsigned int numBuffers_;
unsigned int maxBufferedFiles_;
unsigned int numConcurrentReads_;
std::atomic<unsigned int> readingFilesCount_;

// get LS from filename instead of event header
const bool alwaysStartFromFirstLS_;
const bool verifyChecksum_;
const bool useL1EventID_;
const std::vector<unsigned int> testTCDSFEDRange_;
std::vector<std::string> fileNames_;
bool useFileBroker_;
//std::vector<std::string> fileNamesSorted_;

const bool fileListMode_;
unsigned int fileListIndex_ = 0;
const bool fileListLoopMode_;
unsigned int loopModeIterationInc_ = 0;

edm::RunNumber_t runNumber_;
std::string fuOutputDir_;

edm::ProcessHistoryID processHistoryID_;

unsigned int currentLumiSection_;
uint32_t eventRunNumber_ = 0;
uint32_t GTPEventID_ = 0;
unsigned int eventsThisLumi_;
unsigned long eventsThisRun_ = 0;

/*
*
* Multithreaded file reader
*
**/

typedef std::pair<RawInputFile*, InputChunk*> ReaderInfo;

std::unique_ptr<RawInputFile> currentFile_;
bool chunkIsFree_ = false;

bool startedSupervisorThread_ = false;
std::unique_ptr<std::thread> readSupervisorThread_;
std::unique_ptr<std::thread> dataArrangerThread_;
std::vector<std::thread*> workerThreads_;

tbb::concurrent_queue<unsigned int> workerPool_;
std::vector<ReaderInfo> workerJob_;

tbb::concurrent_queue<InputChunk*> freeChunks_;
tbb::concurrent_queue<std::unique_ptr<RawInputFile>> fileQueue_;

std::mutex mReader_;
std::vector<std::condition_variable*> cvReader_;
std::vector<unsigned int> tid_active_;

std::atomic<bool> quit_threads_;
std::vector<unsigned int> thread_quit_signal;
bool setExceptionState_ = false;
std::mutex startupLock_;
std::condition_variable startupCv_;

int currentFileIndex_ = -1;
//std::list<std::pair<int, std::unique_ptr<RawInputFile>>> filesToDelete_;
std::list<std::pair<int, std::unique_ptr<InputFile>>> filesToDelete_;
std::list<std::pair<int, std::string>> fileNamesToDelete_;
std::mutex fileDeleteLock_;
std::vector<int> streamFileTracker_;
unsigned int nStreams_ = 0;
unsigned int checkEvery_ = 10;

//supervisor thread wakeup
std::mutex mWakeup_;
std::condition_variable cvWakeup_;

//variables for the single buffered mode
int fileDescriptor_ = -1;

std::atomic<bool> threadInit_;

std::map<unsigned int, unsigned int> sourceEventsReport_;
std::mutex monlock_;

std::shared_ptr<DataMode> dataMode_;

};

class RawInputFile: public InputFile {
public:
RawInputFile(evf::EvFDaqDirector::FileStatus status,
unsigned int lumi = 0,
std::string const& name = std::string(),
bool deleteFile = true,
int rawFd = -1,
uint64_t fileSize = 0,
uint16_t rawHeaderSize = 0,
uint32_t nChunks = 0,
int nEvents = 0,
DAQSource* parent = nullptr)
: InputFile(status, lumi, name, deleteFile, rawFd, fileSize, rawHeaderSize, nChunks, nEvents, nullptr),
sourceParent_(parent) {
}
RawInputFile(std::string& name) : InputFile(name) {}
~RawInputFile() {}
bool advance(unsigned char*& dataPosition, const size_t size);
private:
DAQSource* sourceParent_;
};

#endif // EventFilter_Utilities_DAQSource_h

151 changes: 151 additions & 0 deletions EventFilter/Utilities/interface/DAQSourceModels.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#ifndef EventFilter_Utilities_DAQSourceModels_h
#define EventFilter_Utilities_DAQSourceModels_h

#include <condition_variable>
#include <cstdio>
#include <filesystem>
#include <memory>
#include <mutex>
#include <thread>

#include "tbb/concurrent_queue.h"
#include "tbb/concurrent_vector.h"

#include "DataFormats/Provenance/interface/ProcessHistoryID.h"
#include "DataFormats/Provenance/interface/Timestamp.h"
#include "EventFilter/Utilities/interface/EvFDaqDirector.h"
#include "FWCore/Sources/interface/RawInputSource.h"
#include "FWCore/Framework/interface/EventPrincipal.h"
#include "FWCore/Sources/interface/DaqProvenanceHelper.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "IOPool/Streamer/interface/FRDEventMessage.h"

#include "DataFormats/FEDRawData/interface/FEDNumbering.h"
#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"

//import InputChunk
#include "EventFilter/Utilities/interface/FedRawDataInputSource.h"

class FEDRawDataCollection;
class DAQSource;

//evf?
class DataMode {

public:
DataMode(DAQSource *daqSource): daqSource_(daqSource) {}
virtual ~DataMode() = default;
virtual std::vector<std::shared_ptr<const edm::DaqProvenanceHelper>>& makeDaqProvenanceHelpers() = 0;
virtual void readEvent(edm::EventPrincipal& eventPrincipal) = 0;
virtual int dataVersion() const = 0;
virtual void detectVersion(unsigned char* fileBuf, uint32_t fileHeaderOffset) = 0;
virtual uint32_t headerSize() const = 0;
virtual bool versionCheck() const = 0;
virtual uint64_t dataBlockSize() const = 0;
virtual void makeDataBlockView(unsigned char* addr, size_t maxSize, std::vector<uint64_t> const& fileSizes, size_t fileHeaderSize) = 0;
virtual bool nextEventView() = 0;
virtual bool checksumValid() = 0;
virtual std::string getChecksumError() const = 0;
virtual bool isRealData() const = 0;
virtual uint32_t run() const = 0;
virtual bool dataBlockCompleted() const = 0;
virtual bool requireHeader() const = 0;

virtual bool dataBlockInitialized() const = 0;
virtual void setDataBlockInitialized(bool) = 0;

virtual void setTCDSSearchRange(uint16_t, uint16_t) = 0;
virtual std::pair<bool, std::vector<std::string>> defineAdditionalFiles(std::string const& primaryName) const = 0;
protected:
DAQSource *daqSource_;
};


class DataModeFRD : public DataMode {

public:
DataModeFRD(DAQSource *daqSource): DataMode(daqSource) {}
~DataModeFRD() {};
std::vector<std::shared_ptr<const edm::DaqProvenanceHelper>>& makeDaqProvenanceHelpers();
void readEvent(edm::EventPrincipal& eventPrincipal);

//non-virtual
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection& rawData, bool& tcdsInRange, unsigned char*& tcds_pointer);

int dataVersion() const { return detectedFRDversion_; }
void detectVersion(unsigned char* fileBuf, uint32_t fileHeaderOffset) {
detectedFRDversion_ = *((uint16_t*)(fileBuf + fileHeaderOffset));
}

uint32_t headerSize() const {
return FRDHeaderVersionSize[detectedFRDversion_];
}

bool versionCheck() const {
return detectedFRDversion_ <= FRDHeaderMaxVersion;
}

uint64_t dataBlockSize() const {
return event_->size();
}

void makeDataBlockView(unsigned char* addr, size_t maxSize, std::vector<uint64_t> const& fileSizes, size_t fileHeaderSize) {
dataBlockAddr_ = addr;
dataBlockMax_ = maxSize;
eventCached_=false;
nextEventView();
eventCached_=true;
}

bool nextEventView();
bool checksumValid();
std::string getChecksumError() const;

bool isRealData() const {
return event_->isRealData();
}

uint32_t run() const {
return event_->run();
}

//true for DAQ3 FRD
bool dataBlockCompleted() const {
return true;
}

bool requireHeader() const {
return true;
}

bool dataBlockInitialized() const { return true; }

void setDataBlockInitialized(bool) {};


void setTCDSSearchRange(uint16_t MINTCDSuTCAFEDID,uint16_t MAXTCDSuTCAFEDID) {
MINTCDSuTCAFEDID_ = MINTCDSuTCAFEDID;
MAXTCDSuTCAFEDID_ = MAXTCDSuTCAFEDID;
}

std::pair<bool, std::vector<std::string>> defineAdditionalFiles(std::string const& primaryName) const {
return std::make_pair(true, std::vector<std::string>());
}

private:
std::vector<std::shared_ptr<const edm::DaqProvenanceHelper>> daqProvenanceHelpers_;
uint16_t detectedFRDversion_ = 0;
size_t headerSize_ = 0;
std::unique_ptr<FRDEventMsgView> event_;
uint32_t crc_ = 0;
unsigned char * dataBlockAddr_ = 0;
size_t dataBlockMax_ = 0;
size_t fileHeaderSize_ = 0;
uint16_t MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID;
uint16_t MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID;
bool eventCached_ = false;
};


#endif // EventFilter_Utilities_DAQSourceModels_h

Loading

0 comments on commit a509731

Please sign in to comment.