Skip to content

Commit

Permalink
ENH: renaming NumberOfThreads into NumberOfWorkUnits in MultiThreader…
Browse files Browse the repository at this point in the history
…Base

Also expose work units in domain threader

Change-Id: I1dcf581a41fdcfeb04649dae21552f6d66a64a3e
  • Loading branch information
dzenanz committed Jul 17, 2018
1 parent 38f516d commit ce15429
Show file tree
Hide file tree
Showing 67 changed files with 642 additions and 495 deletions.
33 changes: 23 additions & 10 deletions Modules/Core/Common/include/itkDomainThreader.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace itk
* Since a threaded operation is relatively complex compared to a simple serial
* operation, a class instead of a simple method is required. Inside this
* class, the method to partition the data is handled, the logic for
* determining the number of threads is determined, and operations surrounding
* deciding the number of work units is determined, and operations surrounding
* the threading are encapsulated into the class with the
* \c DetermineNumberOfWorkUnitsToUse, \c BeforeThreadedExecution, \c ThreadedExecution,
* and \c AfterThreadedExecution virtual methods.
Expand Down Expand Up @@ -90,22 +90,35 @@ class ITK_TEMPLATE_EXPORT DomainThreader: public Object
void Execute( AssociateType * enclosingClass, const DomainType & domain );

/** Set/Get the DomainPartitioner. */
itkSetObjectMacro( DomainPartitioner, DomainPartitionerType );
itkGetModifiableObjectMacro(DomainPartitioner, DomainPartitionerType );
itkSetObjectMacro( DomainPartitioner, DomainPartitionerType );
itkGetModifiableObjectMacro( DomainPartitioner, DomainPartitionerType );

/** Accessor for number of threads that were actually used in the last
/** Accessor for number of work units that were actually used in the last
* ThreadedExecution. */
itkGetConstMacro( NumberOfWorkUnitsUsed, ThreadIdType );

/** Return the multithreader used by this class. */
MultiThreaderBase * GetMultiThreader() const;

/** Convenience methods to set/get the maximum number of work units to use.
* \warning When setting the maximum number of work units, it will be clamped by
* itk::MultiThreaderBase::GetGlobalMaximumNumberOfThreads() and ITK_MAX_THREADS.
/** Convenience methods to set/get the desired number of work units to use.
* \warning When setting the desired number of work units, it might be clamped by
* itk::MultiThreaderBase::GetGlobalMaximumNumberOfThreads().
* */
ThreadIdType GetMaximumNumberOfThreads() const;
void SetMaximumNumberOfThreads( const ThreadIdType threads );
ThreadIdType GetNumberOfWorkUnits() const
{
return this->m_MultiThreader->GetNumberOfWorkUnits();
}
void SetNumberOfWorkUnits( const ThreadIdType workUnits );

/** Convenience methods to set/get the maximum number of threads to use.
* \warning When setting the maximum number of threads, it will be clamped by
* itk::MultiThreaderBase::GetGlobalMaximumNumberOfThreads().
* */
ThreadIdType GetMaximumNumberOfThreads() const
{
return this->m_MultiThreader->GetMaximumNumberOfThreads();
}
void SetMaximumNumberOfThreads(const ThreadIdType threads);

protected:
DomainThreader();
Expand Down Expand Up @@ -154,7 +167,7 @@ class ITK_TEMPLATE_EXPORT DomainThreader: public Object
DomainThreader * domainThreader;
};

/** Store the actual number of threads used, which may be less than
/** Store the actual number of work units used, which may be less than
* the number allocated by the threader if the object does not split
* well into that number.
* This value is determined at the beginning of \c Execute(). */
Expand Down
24 changes: 14 additions & 10 deletions Modules/Core/Common/include/itkDomainThreader.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,15 @@ DomainThreader< TDomainPartitioner, TAssociate >
}

template< typename TDomainPartitioner, typename TAssociate >
ThreadIdType
void
DomainThreader< TDomainPartitioner, TAssociate >
::GetMaximumNumberOfThreads() const
::SetNumberOfWorkUnits( const ThreadIdType workUnits )
{
return this->m_MultiThreader->GetNumberOfThreads();
if( workUnits != this->GetNumberOfWorkUnits() )
{
this->m_MultiThreader->SetNumberOfWorkUnits( workUnits );
this->Modified();
};
}

template< typename TDomainPartitioner, typename TAssociate >
Expand All @@ -62,7 +66,7 @@ DomainThreader< TDomainPartitioner, TAssociate >
{
if( threads != this->GetMaximumNumberOfThreads() )
{
this->m_MultiThreader->SetNumberOfThreads( threads );
this->m_MultiThreader->SetMaximumNumberOfThreads( threads );
this->Modified();
}
}
Expand Down Expand Up @@ -90,7 +94,7 @@ void
DomainThreader< TDomainPartitioner, TAssociate >
::DetermineNumberOfWorkUnitsUsed()
{
const ThreadIdType threaderNumberOfThreads = this->GetMultiThreader()->GetNumberOfThreads();
const ThreadIdType threaderNumberOfThreads = this->GetMultiThreader()->GetNumberOfWorkUnits();

// Attempt a single dummy partition, just to get the number of subdomains actually created
DomainType subdomain;
Expand All @@ -102,10 +106,10 @@ DomainThreader< TDomainPartitioner, TAssociate >
if( this->m_NumberOfWorkUnitsUsed < threaderNumberOfThreads )
{
// If PartitionDomain is only able to create a lesser number of subdomains,
// ensure that superfluous threads aren't created
// ensure that superfluous work units aren't created
// DomainThreader::SetMaximumNumberOfThreads *should* already have been called by this point,
// but it's not fatal if it somehow gets called later
this->GetMultiThreader()->SetNumberOfThreads(this->m_NumberOfWorkUnitsUsed);
this->GetMultiThreader()->SetNumberOfWorkUnits(this->m_NumberOfWorkUnitsUsed);
}
else if( this->m_NumberOfWorkUnitsUsed > threaderNumberOfThreads )
{
Expand Down Expand Up @@ -135,11 +139,11 @@ ITK_THREAD_RETURN_TYPE
DomainThreader< TDomainPartitioner, TAssociate >
::ThreaderCallback( void* arg )
{
auto * info = static_cast<MultiThreaderBase::ThreadInfoStruct *>(arg);
auto * info = static_cast<MultiThreaderBase::WorkUnitInfo *>(arg);
auto * str = static_cast<ThreadStruct *>(info->UserData);
DomainThreader *thisDomainThreader = str->domainThreader;
const ThreadIdType threadId = info->ThreadID;
const ThreadIdType threadCount = info->NumberOfThreads;
const ThreadIdType threadId = info->WorkUnitID;
const ThreadIdType threadCount = info->NumberOfWorkUnits;

// Get the sub-domain to process for this thread.
DomainType subdomain;
Expand Down
2 changes: 1 addition & 1 deletion Modules/Core/Common/include/itkImageSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class ITK_TEMPLATE_EXPORT ImageSource
* in its own specific thread. Pool and TBB multi-threaders maintain
* a pool of threads (normally equal to number of processing cores)
* which they use to process the pieces. This normally results
* in a single thread being reused to process multiple pieces.
* in a single thread being reused to process multiple work units.
*
* \sa GenerateData(), SplitRequestedRegion() */
virtual void ThreadedGenerateData(const OutputImageRegionType & outputRegionForThread,
Expand Down
10 changes: 5 additions & 5 deletions Modules/Core/Common/include/itkImageSource.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ ImageSource<TOutputImage>
const ImageRegionSplitterBase * splitter = this->GetImageRegionSplitter();
const unsigned int validThreads = splitter->GetNumberOfSplits(outputPtr->GetRequestedRegion(), this->GetNumberOfWorkUnits());

this->GetMultiThreader()->SetNumberOfThreads(validThreads);
this->GetMultiThreader()->SetNumberOfWorkUnits(validThreads);
this->GetMultiThreader()->SetSingleMethod(callbackFunction, &str);

this->GetMultiThreader()->SingleMethodExecute();
Expand All @@ -238,7 +238,7 @@ ImageSource< TOutputImage >
}
else
{
this->GetMultiThreader()->SetNumberOfThreads(this->GetNumberOfWorkUnits());
this->GetMultiThreader()->SetNumberOfWorkUnits(this->GetNumberOfWorkUnits());
this->GetMultiThreader()->template ParallelizeImageRegion<OutputImageDimension>(
this->GetOutput()->GetRequestedRegion(),
[this](const OutputImageRegionType & outputRegionForThread)
Expand Down Expand Up @@ -287,10 +287,10 @@ ITK_THREAD_RETURN_TYPE
ImageSource< TOutputImage >
::ThreaderCallback(void *arg)
{
using ThreadInfo = MultiThreaderBase::ThreadInfoStruct;
using ThreadInfo = MultiThreaderBase::WorkUnitInfo;
ThreadInfo * threadInfo = static_cast<ThreadInfo *>(arg);
ThreadIdType threadId = threadInfo->ThreadID;
ThreadIdType threadCount = threadInfo->NumberOfThreads;
ThreadIdType threadId = threadInfo->WorkUnitID;
ThreadIdType threadCount = threadInfo->NumberOfWorkUnits;
ThreadStruct *str = (ThreadStruct *)(threadInfo->UserData);

// execute the actual method with appropriate output region
Expand Down
2 changes: 1 addition & 1 deletion Modules/Core/Common/include/itkImageToImageFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace itk
* GenerateData(), the image processing will run in a single thread and the
* implementation is responsible for allocating its output data. If a filter
* provides an implementation of ThreadedGenerateData() instead, the image
* will be divided into a number of pieces, a number of threads will be
* will be divided into a number of work units, a number of threads will be
* spawned, and ThreadedGenerateData() will be called in each thread. Here,
* the output memory will be allocated by this superclass prior to calling
* ThreadedGenerateData().
Expand Down
8 changes: 4 additions & 4 deletions Modules/Core/Common/include/itkImageTransformer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ ImageTransformer< TInputImage >
ThreadStruct str;
str.Filter = this;

this->GetMultiThreader()->SetNumberOfThreads(this->GetNumberOfWorkUnits());
this->GetMultiThreader()->SetNumberOfWorkUnits(this->GetNumberOfWorkUnits());
this->GetMultiThreader()->SetSingleMethod(callbackFunction, &str);

this->GetMultiThreader()->SingleMethodExecute();
Expand Down Expand Up @@ -337,10 +337,10 @@ ImageTransformer< TInputImage >
ThreadStruct *str;
ThreadIdType total, threadId, threadCount;

threadId = ( (MultiThreaderBase::ThreadInfoStruct *)( arg ) )->ThreadID;
threadCount = ( (MultiThreaderBase::ThreadInfoStruct *)( arg ) )->NumberOfThreads;
threadId = ( (MultiThreaderBase::WorkUnitInfo *)( arg ) )->WorkUnitID;
threadCount = ( (MultiThreaderBase::WorkUnitInfo *)( arg ) )->NumberOfWorkUnits;

str = (ThreadStruct *)( ( (MultiThreaderBase::ThreadInfoStruct *)( arg ) )->UserData );
str = (ThreadStruct *)( ( (MultiThreaderBase::WorkUnitInfo *)( arg ) )->UserData );

// execute the actual method with appropriate output region
// first find out how many pieces extent can be split into.
Expand Down
66 changes: 40 additions & 26 deletions Modules/Core/Common/include/itkMultiThreaderBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,17 @@ class ITKCommon_EXPORT MultiThreaderBase : public Object
/** Run-time type information (and related methods). */
itkTypeMacro(MultiThreaderBase, Object);

/** Get/Set the number of threads to create. It will be clamped to the range
/** Get/Set the number of threads to use. It will be clamped to the range
* [ 1, m_GlobalMaximumNumberOfThreads ], so the caller of this method should
* check that the requested number of threads was accepted. */
virtual void SetNumberOfThreads(ThreadIdType numberOfThreads);
itkGetConstMacro(NumberOfThreads, ThreadIdType);
virtual void SetMaximumNumberOfThreads( ThreadIdType numberOfThreads );
itkGetConstMacro( MaximumNumberOfThreads, ThreadIdType );

/** Get/Set the number of work units to create. It might be clamped to the range
* [ 1, SomeMaximumNumber ], so the caller of this method should
* check that the requested number of work units was accepted. */
virtual void SetNumberOfWorkUnits( ThreadIdType numberOfWorkUnits );
itkGetConstMacro( NumberOfWorkUnits, ThreadIdType );

/** Set/Get the maximum number of threads to use when multithreading. It
* will be clamped to the range [ 1, ITK_MAX_THREADS ] because several arrays
Expand Down Expand Up @@ -149,27 +155,33 @@ class ITKCommon_EXPORT MultiThreaderBase : public Object
/** This is the structure that is passed to the thread that is
* created from the SingleMethodExecute. It is passed in as a void *,
* and it is up to the method to cast correctly and extract the information.
* The ThreadID is a number between 0 and NumberOfThreads-1 that
* indicates the id of this thread. The UserData is the
* The WorkUnitID is a number between 0 and NumberOfWorkUnits-1 that
* indicates the id of this work unit. The UserData is the
* (void *)arg passed into the SetSingleMethod. */
struct ThreadInfoStruct
{
ThreadIdType ThreadID;
ThreadIdType NumberOfThreads;
void* UserData;
struct WorkUnitInfo
{
ThreadIdType WorkUnitID;
ThreadIdType NumberOfWorkUnits;
void* UserData;
ThreadFunctionType ThreadFunction;
enum { SUCCESS, ITK_EXCEPTION, ITK_PROCESS_ABORTED_EXCEPTION, STD_EXCEPTION, UNKNOWN } ThreadExitCode;
};
enum
{
SUCCESS,
ITK_EXCEPTION,
ITK_PROCESS_ABORTED_EXCEPTION,
STD_EXCEPTION,
UNKNOWN
} ThreadExitCode;
};

/** Execute the SingleMethod (as define by SetSingleMethod) using
* m_NumberOfThreads threads. As a side effect the m_NumberOfThreads will be
* m_NumberOfWorkUnits threads. As a side effect the m_NumberOfWorkUnits will be
* checked against the current m_GlobalMaximumNumberOfThreads and clamped if
* necessary. */
virtual void SingleMethodExecute() = 0;

/** Set the SingleMethod to f() and the UserData field of the
* ThreadInfoStruct that is passed to it will be data.
* This method (and all the methods passed to SetMultipleMethod)
* WorkUnitInfo that is passed to it will be data. This method
* must be of type itkThreadFunctionType and must take a single argument of
* type void *. */
virtual void SetSingleMethod(ThreadFunctionType, void *data) = 0;
Expand Down Expand Up @@ -264,23 +276,25 @@ class ITKCommon_EXPORT MultiThreaderBase : public Object

static ITK_THREAD_RETURN_TYPE ParallelizeImageRegionHelper(void *arg);

/** The number of work units to create. */
ThreadIdType m_NumberOfWorkUnits;

/** The number of threads to use.
* The m_NumberOfThreads must always be less than or equal to
* The m_MaximumNumberOfThreads must always be less than or equal to
* the m_GlobalMaximumNumberOfThreads before it is used during the execution
* of a threaded method. Its value is clamped in the SingleMethodExecute()
* and MultipleMethodExecute(). Its value is initialized to
* of a threaded method. Its value is initialized to
* m_GlobalDefaultNumberOfThreads at construction time. Its value is clamped
* to the current m_GlobalMaximumNumberOfThreads in the
* SingleMethodExecute() and MultipleMethodExecute() methods.
* SingleMethodExecute() method.
*/
ThreadIdType m_NumberOfThreads;
ThreadIdType m_MaximumNumberOfThreads;

/** Static function used as a "proxy callback" by multi-threaders. The
* threading library will call this routine for each thread, which
* will delegate the control to the prescribed SingleMethod. This
* routine acts as an intermediary between the multi-threaders and the
* user supplied callback (SingleMethod) in order to catch any
* exceptions thrown by the threads. */
* threading library will call this routine for each thread, which
* will delegate the control to the prescribed SingleMethod. This
* routine acts as an intermediary between the multi-threaders and the
* user supplied callback (SingleMethod) in order to catch any
* exceptions thrown by the threads. */
static ITK_THREAD_RETURN_TYPE SingleMethodProxy(void *arg);

/** The method to invoke. */
Expand All @@ -300,5 +314,5 @@ class ITKCommon_EXPORT MultiThreaderBase : public Object
ITKCommon_EXPORT std::ostream& operator << (std::ostream& os,
const MultiThreaderBase::ThreaderType& threader);

} // end namespace itk
} // end namespace itk
#endif
21 changes: 12 additions & 9 deletions Modules/Core/Common/include/itkPlatformMultiThreader.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,26 +105,26 @@ It can affect all MultiThreaderBase's derived classes in ITK");
#endif

/** Execute the SingleMethod (as define by SetSingleMethod) using
* m_NumberOfThreads threads. As a side effect the m_NumberOfThreads will be
* m_NumberOfWorkUnits threads. As a side effect the m_NumberOfWorkUnits will be
* checked against the current m_GlobalMaximumNumberOfThreads and clamped if
* necessary. */
void SingleMethodExecute() override;

/** Execute the MultipleMethods (as define by calling SetMultipleMethod for
* each of the required m_NumberOfThreads methods) using m_NumberOfThreads
* threads. As a side effect the m_NumberOfThreads will be checked against the
* each of the required m_NumberOfWorkUnits methods) using m_NumberOfWorkUnits
* threads. As a side effect the m_NumberOfWorkUnits will be checked against the
* current m_GlobalMaximumNumberOfThreads and clamped if necessary. */
itkLegacyMacro(void MultipleMethodExecute());

/** Set the SingleMethod to f() and the UserData field of the
* ThreadInfoStruct that is passed to it will be data.
* WorkUnitInfo that is passed to it will be data.
* This method (and all the methods passed to SetMultipleMethod)
* must be of type itkThreadFunctionType and must take a single argument of
* type void *. */
void SetSingleMethod(ThreadFunctionType, void *data) override;

/** Set the MultipleMethod at the given index to f() and the UserData
* field of the ThreadInfoStruct that is passed to it will be data. */
* field of the WorkUnitInfo that is passed to it will be data. */
itkLegacyMacro(void SetMultipleMethod(ThreadIdType index, ThreadFunctionType, void *data));

/** Create a new thread for the given function. Return a thread id
Expand All @@ -137,7 +137,10 @@ It can affect all MultiThreaderBase's derived classes in ITK");
* Deprecated. Use C++11 thread support instead. */
itkLegacyMacro(void TerminateThread(ThreadIdType thread_id));

struct ThreadInfoStruct: MultiThreaderBase::ThreadInfoStruct
virtual void SetMaximumNumberOfThreads( ThreadIdType numberOfThreads ) override;
virtual void SetNumberOfWorkUnits( ThreadIdType numberOfWorkUnits ) override;

struct WorkUnitInfo: MultiThreaderBase::WorkUnitInfo
{
int *ActiveFlag;
MutexLock::Pointer ActiveFlagLock;
Expand All @@ -152,14 +155,14 @@ It can affect all MultiThreaderBase's derived classes in ITK");
/** An array of thread info containing a thread id
* (0, 1, 2, .. ITK_MAX_THREADS-1), the thread count, and a pointer
* to void so that user data can be passed to each thread. */
ThreadInfoStruct m_ThreadInfoArray[ITK_MAX_THREADS];
WorkUnitInfo m_ThreadInfoArray[ITK_MAX_THREADS];

/** Storage of MutexFunctions and ints used to control spawned
* threads and the spawned thread ids. */
int m_SpawnedThreadActiveFlag[ITK_MAX_THREADS];
MutexLock::Pointer m_SpawnedThreadActiveFlagLock[ITK_MAX_THREADS];
ThreadProcessIdType m_SpawnedThreadProcessID[ITK_MAX_THREADS];
ThreadInfoStruct m_SpawnedThreadInfoArray[ITK_MAX_THREADS];
WorkUnitInfo m_SpawnedThreadInfoArray[ITK_MAX_THREADS];

#if !defined ( ITK_LEGACY_REMOVE )
/** The methods to invoke. */
Expand All @@ -170,7 +173,7 @@ It can affect all MultiThreaderBase's derived classes in ITK");
#endif

/** spawn a new thread for the SingleMethod */
ThreadProcessIdType SpawnDispatchSingleMethodThread(ThreadInfoStruct *);
ThreadProcessIdType SpawnDispatchSingleMethodThread(WorkUnitInfo *);
/** wait for a thread in the threadpool to finish work */
void SpawnWaitForSingleMethodThread(ThreadProcessIdType);

Expand Down
Loading

0 comments on commit ce15429

Please sign in to comment.