Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use TTaskGroup interface to unzip baskets in parallel. #1010

Merged
merged 6 commits into from
Feb 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 64 additions & 35 deletions tree/tree/inc/TTreeCacheUnzip.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,59 +25,91 @@
// //
//////////////////////////////////////////////////////////////////////////

#include "Bytes.h"
#include "TTreeCache.h"

#include "ROOT/TTaskGroup.hxx"
#include <atomic>
#include <queue>
#include <memory>
#include <vector>

class TTree;
class TBranch;
class TThread;
class TCondition;
class TBasket;
class TBranch;
class TMutex;
class TTree;

class TTreeCacheUnzip : public TTreeCache {

public:
// We have three possibilities for the unzipping mode:
// enable, disable and force
enum EParUnzipMode { kEnable, kDisable, kForce };

// Unzipping states for a basket:
enum EUnzipState { kUntouched, kProgress, kFinished };

protected:
// Unzipping state for baskets
struct UnzipState {
// Note: we cannot use std::unique_ptr<std::unique_ptr<char[]>[]> or vector of unique_ptr
// for fUnzipChunks since std::unique_ptr is not copy constructable.
// However, in future upgrade we cannot use make_vector in C++14.
std::unique_ptr<char[]> *fUnzipChunks; ///<! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
std::vector<Int_t> fUnzipLen; ///<! [fNseek] Length of the unzipped buffers
std::atomic<Byte_t> *fUnzipStatus; ///<! [fNSeek]

UnzipState() {
fUnzipChunks = nullptr;
fUnzipStatus = nullptr;
}
~UnzipState() {
if (fUnzipChunks) delete [] fUnzipChunks;
if (fUnzipStatus) delete [] fUnzipStatus;
}
void Clear(Int_t size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sort the member alphabetically (at least within grouping of functionality)

Bool_t IsUntouched(Int_t index) const;
Bool_t IsProgress(Int_t index) const;
Bool_t IsFinished(Int_t index) const;
Bool_t IsUnzipped(Int_t index) const;
void Reset(Int_t oldSize, Int_t newSize);
void SetUntouched(Int_t index);
void SetProgress(Int_t index);
void SetFinished(Int_t index);
void SetMissed(Int_t index);
void SetUnzipped(Int_t index, char* buf, Int_t len);
Bool_t TryUnzipping(Int_t index);
};

typedef struct UnzipState UnzipState_t;
UnzipState_t fUnzipState;

// Members for paral. managing
TThread *fUnzipThread[10];
Bool_t fActiveThread; ///< Used to terminate gracefully the unzippers
TCondition *fUnzipStartCondition; ///< Used to signal the threads to start.
TCondition *fUnzipDoneCondition; ///< Used to wait for an unzip tour to finish. Gives the Async feel.
Bool_t fParallel; ///< Indicate if we want to activate the parallelism (for this instance)
Bool_t fAsyncReading;
TMutex *fMutexList; ///< Mutex to protect the various lists. Used by the condvars.
Bool_t fEmpty;
Int_t fCycle;
Bool_t fParallel; ///< Indicate if we want to activate the parallelism (for this instance)

TMutex *fIOMutex;

Int_t fCycle;
static TTreeCacheUnzip::EParUnzipMode fgParallel; ///< Indicate if we want to activate the parallelism

Int_t fLastReadPos;
Int_t fBlocksToGo;
// IMT TTaskGroup Manager
#ifdef R__USE_IMT
std::unique_ptr<ROOT::Experimental::TTaskGroup> fUnzipTaskGroup;
#endif

// Unzipping related members
Int_t *fUnzipLen; ///<! [fNseek] Length of the unzipped buffers
char **fUnzipChunks; ///<! [fNseek] Individual unzipped chunks. Their summed size is kept under control.
Byte_t *fUnzipStatus; ///<! [fNSeek] For each blk, tells us if it's unzipped or pending
Long64_t fTotalUnzipBytes; ///<! The total sum of the currently unzipped blks

Int_t fNseekMax; ///<! fNseek can change so we need to know its max size
Int_t fUnzipGroupSize; ///<! Min accumulated size of a group of baskets ready to be unzipped by a IMT task
Long64_t fUnzipBufferSize; ///<! Max Size for the ready unzipped blocks (default is 2*fBufferSize)

static Double_t fgRelBuffSize; ///< This is the percentage of the TTreeCacheUnzip that will be used

// Members use to keep statistics
Int_t fNUnzip; ///<! number of blocks that were unzipped
Int_t fNFound; ///<! number of blocks that were found in the cache
Int_t fNStalls; ///<! number of hits which caused a stall
Int_t fNMissed; ///<! number of blocks that were not found in the cache and were unzipped

std::queue<Int_t> fActiveBlks; ///< The blocks which are active now
Int_t fNStalls; ///<! number of hits which caused a stall
Int_t fNUnzip; ///<! number of blocks that were unzipped

private:
TTreeCacheUnzip(const TTreeCacheUnzip &); //this class cannot be copied
Expand All @@ -88,13 +120,12 @@ class TTreeCacheUnzip : public TTreeCache {

// Private methods
void Init();
Int_t StartThreadUnzip(Int_t nthreads);
Int_t StopThreadUnzip();

public:
TTreeCacheUnzip();
TTreeCacheUnzip(TTree *tree, Int_t buffersize=0);
virtual ~TTreeCacheUnzip();

virtual Int_t AddBranch(TBranch *b, Bool_t subbranches = kFALSE);
virtual Int_t AddBranch(const char *branch, Bool_t subbranches = kFALSE);
Bool_t FillBuffer();
Expand All @@ -108,31 +139,29 @@ class TTreeCacheUnzip : public TTreeCache {
static Bool_t IsParallelUnzip();
static Int_t SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option = TTreeCacheUnzip::kEnable);

Bool_t IsActiveThread();
Bool_t IsQueueEmpty();

void WaitUnzipStartSignal();
void SendUnzipStartSignal(Bool_t broadcast);

// Unzipping related methods
#ifdef R__USE_IMT
Int_t CreateTasks();
#endif
Int_t GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen);
virtual void ResetCache();
virtual Int_t GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free);
Int_t GetUnzipGroupSize() { return fUnzipGroupSize; }
virtual void ResetCache();
virtual Int_t SetBufferSize(Int_t buffersize);
void SetUnzipBufferSize(Long64_t bufferSize);
void SetUnzipGroupSize(Int_t groupSize) { fUnzipGroupSize = groupSize; }
static void SetUnzipRelBufferSize(Float_t relbufferSize);
Int_t UnzipBuffer(char **dest, char *src);
Int_t UnzipCache(Int_t &startindex, Int_t &locbuffsz, char *&locbuff);
Int_t UnzipCache(Int_t index);

// Methods to get stats
Int_t GetNUnzip() { return fNUnzip; }
Int_t GetNFound() { return fNFound; }
Int_t GetNMissed(){ return fNMissed; }
Int_t GetNFound() { return fNFound; }

void Print(Option_t* option = "") const;

// static members
static void* UnzipLoop(void *arg);
ClassDef(TTreeCacheUnzip,0) //Specialization of TTreeCache for parallel unzipping
};

Expand Down
4 changes: 3 additions & 1 deletion tree/tree/src/TTree.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -5433,7 +5433,7 @@ Int_t TTree::GetEntry(Long64_t entry, Int_t getall)
#ifdef R__USE_IMT
// At most one parallel read with a single branch
unsigned int nSortedBranches(2);
if (nSortedBranches > 1 && ROOT::IsImplicitMTEnabled() && fIMTEnabled) {
if (nSortedBranches > 1 && ROOT::IsImplicitMTEnabled() && fIMTEnabled && !TTreeCacheUnzip::IsParallelUnzip()) {
if (fSortedBranches.empty()) {
InitializeBranchLists(true);
nSortedBranches = fSortedBranches.size();
Expand Down Expand Up @@ -8383,9 +8383,11 @@ Int_t TTree::SetCacheSizeAux(Bool_t autocache /* = kTRUE */, Long64_t cacheSize
return 0;
}

#ifdef R__USE_IMT
if(TTreeCacheUnzip::IsParallelUnzip() && file->GetCompressionLevel() > 0)
pf = new TTreeCacheUnzip(this, cacheSize);
else
#endif
pf = new TTreeCache(this, cacheSize);

pf->SetAutoCreated(autocache);
Expand Down
Loading