Skip to content

Commit

Permalink
Use TTaskGroup interface to unzip baskets in parallel.
Browse files Browse the repository at this point in the history
  • Loading branch information
zzxuanyuan committed Oct 17, 2017
1 parent 34118f4 commit 4514b4e
Show file tree
Hide file tree
Showing 3 changed files with 514 additions and 770 deletions.
99 changes: 64 additions & 35 deletions tree/tree/inc/TTreeCacheUnzip.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,59 +25,91 @@
// //
//////////////////////////////////////////////////////////////////////////

#include "Bytes.h"
#include "TTreeCache.h"

#include "ROOT/TTaskGroup.hxx"
#include <atomic>
#include <queue>
#include <memory>
#include <vector>

class TTree;
class TBranch;
class TThread;
class TCondition;
class TBasket;
class TBranch;
class TMutex;
class TTree;

class TTreeCacheUnzip : public TTreeCache {

public:
// We have three possibilities for the unzipping mode:
// enable, disable and force
enum EParUnzipMode { kEnable, kDisable, kForce };

// Unzipping states for a basket:
enum EUnzipState { kUntouched, kProgress, kFinished };

protected:
// Unzipping state for baskets
struct UnzipState {
// Note: we cannot use std::unique_ptr<std::unique_ptr<char[]>[]> or vector of unique_ptr
// for fUnzipChunks since std::unique_ptr is not copy constructable.
// However, in future upgrade we cannot use make_vector in C++14.
std::unique_ptr<char[]> *fUnzipChunks; ///<! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
std::vector<Int_t> fUnzipLen; ///<! [fNseek] Length of the unzipped buffers
std::atomic<Byte_t> *fUnzipStatus; ///<! [fNSeek]

UnzipState() {
fUnzipChunks = nullptr;
fUnzipStatus = nullptr;
}
~UnzipState() {
if (fUnzipChunks) delete [] fUnzipChunks;
if (fUnzipStatus) delete [] fUnzipStatus;
}
void Clear(Int_t size);
Bool_t IsUntouched(Int_t index) const;
Bool_t IsProgress(Int_t index) const;
Bool_t IsFinished(Int_t index) const;
Bool_t IsUnzipped(Int_t index) const;
void Reset(Int_t oldSize, Int_t newSize);
void SetUntouched(Int_t index);
void SetProgress(Int_t index);
void SetFinished(Int_t index);
void SetMissed(Int_t index);
void SetUnzipped(Int_t index, char* buf, Int_t len);
Bool_t TryUnzipping(Int_t index);
};

typedef struct UnzipState UnzipState_t;
UnzipState_t fUnzipState;

// Members for paral. managing
TThread *fUnzipThread[10];
Bool_t fActiveThread; ///< Used to terminate gracefully the unzippers
TCondition *fUnzipStartCondition; ///< Used to signal the threads to start.
TCondition *fUnzipDoneCondition; ///< Used to wait for an unzip tour to finish. Gives the Async feel.
Bool_t fParallel; ///< Indicate if we want to activate the parallelism (for this instance)
Bool_t fAsyncReading;
TMutex *fMutexList; ///< Mutex to protect the various lists. Used by the condvars.
Bool_t fEmpty;
Int_t fCycle;
Bool_t fParallel; ///< Indicate if we want to activate the parallelism (for this instance)

TMutex *fIOMutex;

Int_t fCycle;
static TTreeCacheUnzip::EParUnzipMode fgParallel; ///< Indicate if we want to activate the parallelism

Int_t fLastReadPos;
Int_t fBlocksToGo;
// IMT TTaskGroup Manager
#ifdef R__USE_IMT
std::unique_ptr<ROOT::Experimental::TTaskGroup> fUnzipTaskGroup;
#endif

// Unzipping related members
Int_t *fUnzipLen; ///<! [fNseek] Length of the unzipped buffers
char **fUnzipChunks; ///<! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
Byte_t *fUnzipStatus; ///<! [fNSeek] For each blk, tells us if it's unzipped or pending
Long64_t fTotalUnzipBytes; ///<! The total sum of the currently unzipped blks

Int_t fNseekMax; ///<! fNseek can change so we need to know its max size
Int_t fUnzipGroupSize; ///<! Min accumulated size of a group of baskets ready to be unzipped by a IMT task
Long64_t fUnzipBufferSize; ///<! Max Size for the ready unzipped blocks (default is 2*fBufferSize)

static Double_t fgRelBuffSize; ///< This is the percentage of the TTreeCacheUnzip that will be used

// Members use to keep statistics
Int_t fNUnzip; ///<! number of blocks that were unzipped
Int_t fNFound; ///<! number of blocks that were found in the cache
Int_t fNStalls; ///<! number of hits which caused a stall
Int_t fNMissed; ///<! number of blocks that were not found in the cache and were unzipped

std::queue<Int_t> fActiveBlks; ///< The blocks which are active now
Int_t fNStalls; ///<! number of hits which caused a stall
Int_t fNUnzip; ///<! number of blocks that were unzipped

private:
TTreeCacheUnzip(const TTreeCacheUnzip &); //this class cannot be copied
Expand All @@ -88,13 +120,12 @@ class TTreeCacheUnzip : public TTreeCache {

// Private methods
void Init();
Int_t StartThreadUnzip(Int_t nthreads);
Int_t StopThreadUnzip();

public:
TTreeCacheUnzip();
TTreeCacheUnzip(TTree *tree, Int_t buffersize=0);
virtual ~TTreeCacheUnzip();

virtual Int_t AddBranch(TBranch *b, Bool_t subbranches = kFALSE);
virtual Int_t AddBranch(const char *branch, Bool_t subbranches = kFALSE);
Bool_t FillBuffer();
Expand All @@ -108,31 +139,29 @@ class TTreeCacheUnzip : public TTreeCache {
static Bool_t IsParallelUnzip();
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option = TTreeCacheUnzip::kEnable);

Bool_t IsActiveThread();
Bool_t IsQueueEmpty();

void WaitUnzipStartSignal();
void SendUnzipStartSignal(Bool_t broadcast);

// Unzipping related methods
#ifdef R__USE_IMT
Int_t CreateTasks();
#endif
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen);
virtual void ResetCache();
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free);
Int_t GetUnzipGroupSize() { return fUnzipGroupSize; }
virtual void ResetCache();
virtual Int_t SetBufferSize(Int_t buffersize);
void SetUnzipBufferSize(Long64_t bufferSize);
void SetUnzipGroupSize(Int_t groupSize) { fUnzipGroupSize = groupSize; }
static void SetUnzipRelBufferSize(Float_t relbufferSize);
Int_t UnzipBuffer(char **dest, char *src);
Int_t UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff);
Int_t UnzipCache(Int_t index);

// Methods to get stats
Int_t GetNUnzip() { return fNUnzip; }
Int_t GetNFound() { return fNFound; }
Int_t GetNMissed(){ return fNMissed; }
Int_t GetNFound() { return fNFound; }

void Print(Option_t* option = "") const;

// static members
static void* UnzipLoop(void *arg);
ClassDef(TTreeCacheUnzip,0) //Specialization of TTreeCache for parallel unzipping
};

Expand Down
2 changes: 2 additions & 0 deletions tree/tree/src/TTree.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -8318,9 +8318,11 @@ Int_t TTree::SetCacheSizeAux(Bool_t autocache /* = kTRUE */, Long64_t cacheSize
return 0;
}

#ifdef R__USE_IMT
if(TTreeCacheUnzip::IsParallelUnzip() && file->GetCompressionLevel() > 0)
pf = new TTreeCacheUnzip(this, cacheSize);
else
#endif
pf = new TTreeCache(this, cacheSize);

pf->SetAutoCreated(autocache);
Expand Down
Loading

0 comments on commit 4514b4e

Please sign in to comment.