Skip to content

Commit

Permalink
log allreduce/broadcast caller, engine should track caller stack for
Browse files Browse the repository at this point in the history
investigation
  • Loading branch information
chenqin committed Jun 19, 2019
1 parent e391238 commit a9d7331
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 23 deletions.
6 changes: 4 additions & 2 deletions include/rabit/c_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ RABIT_DLL void RabitGetProcessorName(char *out_name,
* \param root the root of process
*/
RABIT_DLL void RabitBroadcast(void *sendrecv_data,
rbt_ulong size, int root);
rbt_ulong size, int root,
const char* caller = __builtin_FUNCTION());
/*!
* \brief perform in-place allreduce, on sendrecvbuf
* this function is NOT thread-safe
Expand All @@ -108,7 +109,8 @@ RABIT_DLL void RabitAllreduce(void *sendrecvbuf,
int enum_dtype,
int enum_op,
void (*prepare_fun)(void *arg),
void *prepare_arg);
void *prepare_arg,
const char* caller = __builtin_FUNCTION());

/*!
* \brief load latest check point
Expand Down
34 changes: 22 additions & 12 deletions include/rabit/internal/rabit-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,36 +127,39 @@ inline std::string GetProcessorName(void) {
return engine::GetEngine()->GetHost();
}
// broadcast data to all other nodes from root
inline void Broadcast(void *sendrecv_data, size_t size, int root) {
inline void Broadcast(void *sendrecv_data, size_t size, int root, const char* caller) {
utils::Printf("[%d] Broadcast caller is %s \n", GetRank(), caller);
engine::GetEngine()->Broadcast(sendrecv_data, size, root);
}
template<typename DType>
inline void Broadcast(std::vector<DType> *sendrecv_data, int root) {
inline void Broadcast(std::vector<DType> *sendrecv_data, int root, const char* caller) {
size_t size = sendrecv_data->size();
Broadcast(&size, sizeof(size), root);
if (sendrecv_data->size() != size) {
sendrecv_data->resize(size);
}
if (size != 0) {
Broadcast(&(*sendrecv_data)[0], size * sizeof(DType), root);
Broadcast(&(*sendrecv_data)[0], size * sizeof(DType), root, caller);
}
}
inline void Broadcast(std::string *sendrecv_data, int root) {
inline void Broadcast(std::string *sendrecv_data, int root, const char* caller) {
size_t size = sendrecv_data->length();
Broadcast(&size, sizeof(size), root);
Broadcast(&size, sizeof(size), root, caller);
if (sendrecv_data->length() != size) {
sendrecv_data->resize(size);
}
if (size != 0) {
Broadcast(&(*sendrecv_data)[0], size * sizeof(char), root);
Broadcast(&(*sendrecv_data)[0], size * sizeof(char), root, caller);
}
}

// perform inplace Allreduce
template<typename OP, typename DType>
inline void Allreduce(DType *sendrecvbuf, size_t count,
void (*prepare_fun)(void *arg),
void *prepare_arg) {
void *prepare_arg,
const char* caller) {
utils::Printf("[%d] Allreduce caller is %s \n", GetRank(), caller);
engine::Allreduce_(sendrecvbuf, sizeof(DType), count, op::Reducer<OP, DType>,
engine::mpi::GetType<DType>(), OP::kType, prepare_fun, prepare_arg);
}
Expand All @@ -167,7 +170,8 @@ inline void InvokeLambda_(void *fun) {
(*static_cast<std::function<void()>*>(fun))();
}
template<typename OP, typename DType>
inline void Allreduce(DType *sendrecvbuf, size_t count, std::function<void()> prepare_fun) {
inline void Allreduce(DType *sendrecvbuf, size_t count, std::function<void()> prepare_fun, const char* caller) {
utils::Printf("[%d] Allreduce caller is %s \n", GetRank(), caller);
engine::Allreduce_(sendrecvbuf, sizeof(DType), count, op::Reducer<OP, DType>,
engine::mpi::GetType<DType>(), OP::kType, InvokeLambda_, &prepare_fun);
}
Expand Down Expand Up @@ -288,7 +292,7 @@ inline Reducer<DType, freduce>::Reducer(void) {
template<typename DType, void (*freduce)(DType &dst, const DType &src)> // NOLINT(*)
inline void Reducer<DType, freduce>::Allreduce(DType *sendrecvbuf, size_t count,
void (*prepare_fun)(void *arg),
void *prepare_arg) {
void *prepare_arg, const char* caller) {
handle_.Allreduce(sendrecvbuf, sizeof(DType), count, prepare_fun, prepare_arg);
}
// function to perform reduction for SerializeReducer
Expand Down Expand Up @@ -337,7 +341,9 @@ template<typename DType>
inline void SerializeReducer<DType>::Allreduce(DType *sendrecvobj,
size_t max_nbyte, size_t count,
void (*prepare_fun)(void *arg),
void *prepare_arg) {
void *prepare_arg,
const char* caller) {
utils::Printf("[%d] Allreduce caller is %s \n", GetRank(), caller);
buffer_.resize(max_nbyte * count);
// setup closure
SerializeReduceClosure<DType> c;
Expand All @@ -355,13 +361,17 @@ inline void SerializeReducer<DType>::Allreduce(DType *sendrecvobj,
#if DMLC_USE_CXX11
template<typename DType, void (*freduce)(DType &dst, const DType &src)> // NOLINT(*)g
inline void Reducer<DType, freduce>::Allreduce(DType *sendrecvbuf, size_t count,
std::function<void()> prepare_fun) {
std::function<void()> prepare_fun,
const char* caller) {
utils::Printf("[%d] Allreduce caller is %s \n", GetRank(), caller);
this->Allreduce(sendrecvbuf, count, InvokeLambda_, &prepare_fun);
}
template<typename DType>
inline void SerializeReducer<DType>::Allreduce(DType *sendrecvobj,
size_t max_nbytes, size_t count,
std::function<void()> prepare_fun) {
std::function<void()> prepare_fun,
const char* caller) {
utils::Printf("[%d] Allreduce caller is %s \n", GetRank(), caller);
this->Allreduce(sendrecvobj, max_nbytes, count, InvokeLambda_, &prepare_fun);
}
#endif // DMLC_USE_CXX11
Expand Down
19 changes: 10 additions & 9 deletions include/rabit/rabit.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#define RABIT_RABIT_H_ // NOLINT(*)
#include <string>
#include <vector>
#include <cstdio>

// whether or not use c++11 support
#ifndef DMLC_USE_CXX11
Expand Down Expand Up @@ -142,7 +143,7 @@ inline void TrackerGetConfig(const char *key, char* value, ...);
* \param size the data size
* \param root the process root
*/
inline void Broadcast(void *sendrecv_data, size_t size, int root);
inline void Broadcast(void *sendrecv_data, size_t size, int root, const char* caller = __builtin_FUNCTION());
/*!
* \brief broadcasts an std::vector<DType> to every node from root
* \param sendrecv_data the pointer to send/receive vector,
Expand All @@ -152,14 +153,14 @@ inline void Broadcast(void *sendrecv_data, size_t size, int root);
* that can be directly transmitted by sending the sizeof(DType)
*/
template<typename DType>
inline void Broadcast(std::vector<DType> *sendrecv_data, int root);
inline void Broadcast(std::vector<DType> *sendrecv_data, int root, const char* caller = __builtin_FUNCTION());
/*!
* \brief broadcasts a std::string to every node from the root
* \param sendrecv_data the pointer to the send/receive buffer,
* for the receiver, the vector does not need to be pre-allocated
* \param root the process root
*/
inline void Broadcast(std::string *sendrecv_data, int root);
inline void Broadcast(std::string *sendrecv_data, int root, const char* caller = __builtin_FUNCTION());
/*!
* \brief performs in-place Allreduce on sendrecvbuf
* this function is NOT thread-safe
Expand All @@ -184,7 +185,7 @@ inline void Broadcast(std::string *sendrecv_data, int root);
template<typename OP, typename DType>
inline void Allreduce(DType *sendrecvbuf, size_t count,
void (*prepare_fun)(void *) = NULL,
void *prepare_arg = NULL);
void *prepare_arg = NULL, const char* caller = __builtin_FUNCTION());
// C++11 support for lambda prepare function
#if DMLC_USE_CXX11
/*!
Expand Down Expand Up @@ -213,7 +214,7 @@ inline void Allreduce(DType *sendrecvbuf, size_t count,
*/
template<typename OP, typename DType>
inline void Allreduce(DType *sendrecvbuf, size_t count,
std::function<void()> prepare_fun);
std::function<void()> prepare_fun, const char* caller = __builtin_FUNCTION());
#endif // C++11
/*!
* \brief loads the latest check point
Expand Down Expand Up @@ -312,7 +313,7 @@ class Reducer {
*/
inline void Allreduce(DType *sendrecvbuf, size_t count,
void (*prepare_fun)(void *) = NULL,
void *prepare_arg = NULL);
void *prepare_arg = NULL, const char* caller = __builtin_FUNCTION());
#if DMLC_USE_CXX11
/*!
* \brief customized in-place all reduce operation, with lambda function as preprocessor
Expand All @@ -321,7 +322,7 @@ class Reducer {
* \param prepare_fun lambda function executed to prepare the data, if necessary
*/
inline void Allreduce(DType *sendrecvbuf, size_t count,
std::function<void()> prepare_fun);
std::function<void()> prepare_fun, const char* caller = __builtin_FUNCTION());
#endif // DMLC_USE_CXX11

private:
Expand Down Expand Up @@ -356,7 +357,7 @@ class SerializeReducer {
inline void Allreduce(DType *sendrecvobj,
size_t max_nbyte, size_t count,
void (*prepare_fun)(void *) = NULL,
void *prepare_arg = NULL);
void *prepare_arg = NULL, const char* caller = __builtin_FUNCTION());
// C++11 support for lambda prepare function
#if DMLC_USE_CXX11
/*!
Expand All @@ -369,7 +370,7 @@ class SerializeReducer {
*/
inline void Allreduce(DType *sendrecvobj,
size_t max_nbyte, size_t count,
std::function<void()> prepare_fun);
std::function<void()> prepare_fun, const char* caller = __builtin_FUNCTION());
#endif // DMLC_USE_CXX11

private:
Expand Down

0 comments on commit a9d7331

Please sign in to comment.