Skip to content

Commit

Permalink
Use TTaskGroup interface to unzip baskets in parallel.
Browse files Browse the repository at this point in the history
  • Loading branch information
zzxuanyuan committed Sep 18, 2017
1 parent 353af50 commit e6df15d
Show file tree
Hide file tree
Showing 2 changed files with 378 additions and 654 deletions.
27 changes: 9 additions & 18 deletions tree/tree/inc/TTreeCacheUnzip.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

#include <queue>

#include "ROOT/TTaskGroup.hxx"
#include <atomic>

class TTree;
class TBranch;
class TThread;
Expand All @@ -37,34 +40,30 @@ class TBasket;
class TMutex;

class TTreeCacheUnzip : public TTreeCache {

public:
// We have three possibilities for the unzipping mode:
// enable, disable and force
enum EParUnzipMode { kEnable, kDisable, kForce };

protected:

ROOT::Experimental::TTaskGroup *fUnzipTaskGroup;

// Members for paral. managing
TThread *fUnzipThread[10];
Bool_t fActiveThread; ///< Used to terminate gracefully the unzippers
TCondition *fUnzipStartCondition; ///< Used to signal the threads to start.
TCondition *fUnzipDoneCondition; ///< Used to wait for an unzip tour to finish. Gives the Async feel.
Bool_t fParallel; ///< Indicate if we want to activate the parallelism (for this instance)
Bool_t fAsyncReading;
TMutex *fMutexList; ///< Mutex to protect the various lists. Used by the condvars.
TMutex *fIOMutex;

Int_t fCycle;
static TTreeCacheUnzip::EParUnzipMode fgParallel; ///< Indicate if we want to activate the parallelism

Int_t fLastReadPos;
Int_t fBlocksToGo;

// Unzipping related members
Int_t *fUnzipLen; ///<! [fNseek] Length of the unzipped buffers
char **fUnzipChunks; ///<! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
Byte_t *fUnzipStatus; ///<! [fNSeek] For each blk, tells us if it's unzipped or pending
Long64_t fTotalUnzipBytes; ///<! The total sum of the currently unzipped blks
std::atomic<Byte_t> *fUnzipStatus; ///<! [fNSeek]

Int_t fNseekMax; ///<! fNseek can change so we need to know its max size
Long64_t fUnzipBufferSize; ///<! Max Size for the ready unzipped blocks (default is 2*fBufferSize)
Expand All @@ -77,8 +76,6 @@ class TTreeCacheUnzip : public TTreeCache {
Int_t fNStalls; ///<! number of hits which caused a stall
Int_t fNMissed; ///<! number of blocks that were not found in the cache and were unzipped

std::queue<Int_t> fActiveBlks; ///< The blocks which are active now

private:
TTreeCacheUnzip(const TTreeCacheUnzip &); //this class cannot be copied
TTreeCacheUnzip& operator=(const TTreeCacheUnzip &);
Expand All @@ -88,8 +85,6 @@ class TTreeCacheUnzip : public TTreeCache {

// Private methods
void Init();
Int_t StartThreadUnzip(Int_t nthreads);
Int_t StopThreadUnzip();

public:
TTreeCacheUnzip();
Expand All @@ -108,12 +103,8 @@ class TTreeCacheUnzip : public TTreeCache {
static Bool_t IsParallelUnzip();
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option = TTreeCacheUnzip::kEnable);

Bool_t IsActiveThread();
Bool_t IsQueueEmpty();

void WaitUnzipStartSignal();
void SendUnzipStartSignal(Bool_t broadcast);

// Unzipping related methods
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen);
virtual void ResetCache();
Expand All @@ -122,7 +113,8 @@ class TTreeCacheUnzip : public TTreeCache {
void SetUnzipBufferSize(Long64_t bufferSize);
static void SetUnzipRelBufferSize(Float_t relbufferSize);
Int_t UnzipBuffer(char **dest, char *src);
Int_t UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff);
Int_t UnzipCache(Int_t index, Int_t &locbuffsz, char *&locbuff);
Int_t CreateTasks();

// Methods to get stats
Int_t GetNUnzip() { return fNUnzip; }
Expand All @@ -132,7 +124,6 @@ class TTreeCacheUnzip : public TTreeCache {
void Print(Option_t* option = "") const;

// static members
static void* UnzipLoop(void *arg);
ClassDef(TTreeCacheUnzip,0) //Specialization of TTreeCache for parallel unzipping
};

Expand Down
Loading

0 comments on commit e6df15d

Please sign in to comment.