Skip to content

Commit

Permalink
Start managed arg detector, change cudaLaunchManaged
Browse files Browse the repository at this point in the history
to facilitate template matching (old return type too restrictive
and incompatible with cudaPtrs)
- Managed arg detector doesn't match as it should. Tests succeed
if it isn't used at line cms-sw#95 of thread_pool_TBBQueueBlocking.h
  • Loading branch information
Oblynx committed Aug 19, 2015
1 parent 809e7f5 commit 0888f6f
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 38 deletions.
82 changes: 50 additions & 32 deletions FWCore/Services/interface/thread_pool_TBBQueueBlocking.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
/**
/*
CMSSW CUDA management and thread pool Service
Author: Konstantinos Samaras-Tsakiris, [email protected]
*//*
--> Thread Pool:
Copyright (c) 2012 Jakob Progsch, Václav Zeman
This software is provided 'as-is', without any express or implied
warranty. In no event will the authors be held liable for any damages
Expand All @@ -16,7 +20,6 @@ freely, subject to the following restrictions:
distribution.
--> This is an altered version of the original code.
Editor: Konstantinos Samaras-Tsakiris, [email protected]
*/

#ifndef Thread_Pool_Service_H
Expand Down Expand Up @@ -81,34 +84,42 @@ class ThreadPoolService {
// Launch kernel function with args
// Configure execution policy before launch!
template<typename F, typename... Args>
inline std::future<typename std::result_of<F(Args...)>::type>
inline std::future<void>
cudaLaunchManaged(const cudaConfig::ExecutionPolicy& execPol, F&& f, Args&&... args)
{
return getFuture([&](){
#ifdef __NVCC__
f<<<execPol.getGridSize(), execPol.getBlockSize(),
execPol.getSharedMemBytes()>>>(args...);
#endif
//std::cout<<"[In task]: Launched\n";
cudaStreamSynchronize(cudaStreamPerThread);
//std::cout<<"[In task]: Synced\n";
});
using packaged_task_t = std::packaged_task<void()>;

std::shared_ptr<packaged_task_t> task(new packaged_task_t([&](){
#ifdef __NVCC__
f<<<execPol.getGridSize(), execPol.getBlockSize(),
execPol.getSharedMemBytes()>>>(
std::forward<Args>(args)...);
#endif
//std::cout<<"[In task]: Launched\n";
cudaStreamSynchronize(cudaStreamPerThread);
//std::cout<<"[In task]: Synced\n";
}));
std::future<void> resultFut = task->get_future();
tasks_.emplace([task](){ (*task)(); });
return resultFut;
}
// Overload: differentiate between managed-nonmanaged args
template<typename F, typename... NMArgs, template<typename...> class NM,
typename... MArgs, template<typename...> class M>
typename std::enable_if<std::is_same<NM<NMArgs...>,utils::NonManagedArgs<NMArgs...>>::value
&& std::is_same<M<MArgs...>,utils::ManagedArgs<MArgs...>>::value,
std::future<typename std::result_of<F(void)>>>::type
cudaLaunchManaged(const cudaConfig::ExecutionPolicy& execPol,
F&& f, NM<NMArgs...>&& nonManaged, M<MArgs...>&& managed)
;/*{
std::cout<<"Separate managed-unmanaged args!\n";
//return cudaLaunchManaged(std::forward<F>(f), nonManaged.forward(), managed.forward());
return unpackArgsTuple(typename GenSeq<sizeof...(NMArgs)+
sizeof...(MArgs)>::type(), std::forward<F>(f),
merge(nonManaged,managed));
}*/

/*template<typename T>
??? attachManagedMemory(T&& arg)
{
IFcudaPointer(std::forward<Head>(first));
*//*struct CudaPtrArg{
void operate(){
}
};
struct NonCudaPtrArg{
void operate(){}
};
std::conditional<std::is_same<Head, cudaPointer>, CudaPtrArg, NonCudaPtrArg>::
type::operate(std::forward<Head>(first));*/
//}

template<typename F>
static cudaConfig::ExecutionPolicy configureLaunch(int totalThreads, F&& f){
cudaConfig::ExecutionPolicy execPol;
Expand All @@ -126,11 +137,18 @@ class ThreadPoolService {
//!< @brief Joins all threads
void stopWorkers();
private:
/*template<int... S, typename F, typename... NMArgs, template<typename...> class NM,
typename... MArgs, template<typename...> class M>
void unpackArgsTuple(Seq<S...>, F&& f, NM<NMArgs...>&& nonMan, M<MArgs...>&& man){
}*/
template<typename T, typename std::enable_if<std::is_base_of<cudaPtrBase, T>::value>::type>
auto preprocessManagedMem(T&& cudaPtr) -> decltype(cudaPtr.p){
std::cout<<"[memAttach]: Managed arg!\n";
cudaPtr.attachStream();
return cudaPtr.p;
}
template<typename T, typename std::enable_if<!std::is_base_of<cudaPtrBase, T>::value>::type>
auto preprocessManagedMem(T&& valueArg) -> decltype(valueArg){
//Do nothing
std::cout<<"[memAttach]: value arg\n";
return valueArg;
}
// need to keep track of threads so we can join them
std::vector< std::thread > workers_;
// the task concurrent queue
Expand Down
14 changes: 12 additions & 2 deletions FWCore/Services/interface/utils/cuda_pointer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,28 @@
// std::unique_ptr<T> p;
// };

class cudaPtrBase {};

template<typename T>
class cudaPointer{
class cudaPointer: cudaPtrBase{
public:
//flag: cudaMemAttachGlobal | cudaMemAttachHost
cudaPointer(int elementN, unsigned flag=cudaMemAttachGlobal): p(new T){
cudaPointer(int elementN, unsigned flag=cudaMemAttachGlobal): p(new T), attachment(flag){
cudaMallocManaged(&p, elementN*sizeof(T), flag);
}
//p must retain ownership until here!
~cudaPointer(){
cudaFree(p);
}
//Only call default if on a new thread
void attachStream(cudaStream_t stream= cudaStreamPerThread){
attachment= cudaMemAttachSingle;
cudaStreamAttachMemAsync(stream, p, 0, attachment);
}

operator T*(){ return p; }
//public!
T* p;
private:
unsigned attachment;
};
6 changes: 2 additions & 4 deletions FWCore/Services/test/test_threadPool_service.cppunit.cu
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,7 @@ void TestThreadPoolService::CUDAAutolaunchCUDAPTRTest()
cout<<"Launching auto...\n";
// Auto launch config
cudaConfig::ExecutionPolicy execPol((*poolPtr)->configureLaunch(n, longKernel));
(*poolPtr)->cudaLaunchManaged(execPol, longKernel, (int)n,(int)times,
const_cast<const float*>(in.p),out.p).get();
(*poolPtr)->cudaLaunchManaged(execPol, longKernel, (int)n,(int)times, in,out).get();
for(int i=0; i<n; i++) if (times*in.p[i]-out.p[i]>TOLERANCE || times*in.p[i]-out.p[i]<-TOLERANCE){
cout<<"ERROR: i="<<i<<'\n';
CPPUNIT_ASSERT_DOUBLES_EQUAL(times*in.p[i], out.p[i], TOLERANCE);
Expand All @@ -251,8 +250,7 @@ void TestThreadPoolService::CUDAAutolaunchCUDAPTRTest()
cout<<"Launching manual...\n";
// Manual launch config
execPol= cudaConfig::ExecutionPolicy(320, (n-1+320)/320);
(*poolPtr)->cudaLaunchManaged(execPol, longKernel, (int)n,(int)times,
const_cast<const float*>(in.p),out.p).get();
(*poolPtr)->cudaLaunchManaged(execPol, longKernel, (int)n,(int)times, in,out).get();
for(int i=0; i<n; i++) if (times*in.p[i]-out.p[i]>TOLERANCE || times*in.p[i]-out.p[i]<-TOLERANCE){
cout<<"ERROR: i="<<i<<'\n';
CPPUNIT_ASSERT_DOUBLES_EQUAL(times*in.p[i], out.p[i], TOLERANCE);
Expand Down

0 comments on commit 0888f6f

Please sign in to comment.