Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocking futures support #1

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions inc/hclib.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ struct _phased_t;
* @brief Function prototype executable by an async.
* @param[in] arg Arguments to the function
*/
typedef void (*asyncFct_t) (void * arg);
typedef void (*asyncFct_t)(void * arg);
typedef void *(*futureFct_t)(void *arg);

void hclib_launch(int * argc, char ** argv, asyncFct_t fct_ptr, void * arg);

Expand All @@ -83,6 +84,14 @@ void hclib_async(asyncFct_t fct_ptr, void * arg,
struct hclib_ddf_st ** ddf_list, struct _phased_t * phased_clause,
int property);

/*
* Spawn an async that automatically puts a DDF on termination. It is the user's
* responsibility to call hclib_ddf_free on the returned ddf_t.
*/
hclib_ddf_t *hclib_async_future(futureFct_t fp, void *arg,
hclib_ddf_t **ddf_list, struct _phased_t *phased_clause,
int property);

/*
* Forasync definition and API
*/
Expand All @@ -102,15 +111,15 @@ typedef int forasync_mode_t;
* @param[in] arg Argument to the loop iteration
* @param[in] index Current iteration index
*/
typedef void (*forasync1D_Fct_t) (void * arg,int index);
typedef void (*forasync1D_Fct_t)(void *arg, int index);

/**
* @brief Function prototype for a 2-dimensions forasync.
* @param[in] arg Argument to the loop iteration
* @param[in] index_outer Current outer iteration index
* @param[in] index_inner Current inner iteration index
*/
typedef void (*forasync2D_Fct_t) (void * arg,int index_outer,int index_inner);
typedef void (*forasync2D_Fct_t)(void *arg, int index_outer, int index_inner);

/**
* @brief Function prototype for a 3-dimensions forasync.
Expand All @@ -119,7 +128,8 @@ typedef void (*forasync2D_Fct_t) (void * arg,int index_outer,int index_inner);
* @param[in] index_mid Current intermediate iteration index
* @param[in] index_inner Current inner iteration index
*/
typedef void (*forasync3D_Fct_t) (void * arg,int index_outer,int index_mid,int index_inner);
typedef void (*forasync3D_Fct_t)(void *arg, int index_outer, int index_mid,
int index_inner);

/**
* @brief Parallel for loop 'forasync' (up to 3 dimensions).
Expand All @@ -136,8 +146,9 @@ typedef void (*forasync3D_Fct_t) (void * arg,int index_outer,int index_mid,int i
* @param[in] domain Loop domains to iterate over (array of size 'dim').
* @param[in] mode Forasync mode to control chunking strategy (flat chunking or recursive).
*/
void hclib_forasync(void* forasync_fct, void * argv, hclib_ddf_t ** ddf_list, struct _phased_t * phased_clause,
void *accumed_placeholder, int dim, loop_domain_t * domain, forasync_mode_t mode);
void hclib_forasync(void *forasync_fct, void *argv, hclib_ddf_t **ddf_list,
struct _phased_t *phased_clause, void *accumed_placeholder, int dim,
loop_domain_t *domain, forasync_mode_t mode);

/**
* @brief starts a new finish scope
Expand Down
1 change: 1 addition & 0 deletions inc/hclib_cpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ ddf_t *ddf_create();
void ddf_free(ddf_t *ddf);
void ddf_put(ddf_t *ddf, void *datum);
void *ddf_get(ddf_t *ddf);
void *ddf_wait(ddf_t *ddf);

hc_workerState *current_ws();
int current_worker();
Expand Down
34 changes: 34 additions & 0 deletions inc/hcpp-async.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,40 @@ inline void asyncComm(T lambda) {
spawn_commTask(task);
}

template <typename T>
hclib_ddf_t *asyncFuture(T lambda) {
hclib_ddf_t *event = hclib_ddf_create();
/*
* TODO creating this closure may be inefficient. While the capture list is
* precise, if the user-provided lambda is large then copying it by value
* will also take extra time.
*/
auto wrapper = [event, lambda]() {
lambda();
hclib_ddf_put(event, NULL);
};
task_t* task = _allocate_async(wrapper, false);
spawn(task);
return event;
}

template <typename T>
hclib_ddf_t *asyncFutureAwait(hclib_ddf_t **ddf_list, T lambda) {
hclib_ddf_t *event = hclib_ddf_create();
/*
* TODO creating this closure may be inefficient. While the capture list is
* precise, if the user-provided lambda is large then copying it by value
* will also take extra time.
*/
auto wrapper = [event, lambda]() {
lambda();
hclib_ddf_put(event, NULL);
};
task_t* task = _allocate_async(wrapper, true);
spawn_await(task, ddf_list);
return event;
}

inline void finish(std::function<void()> lambda) {
hclib_start_finish();
lambda();
Expand Down
22 changes: 17 additions & 5 deletions inc/hcpp-ddf.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

/*
* hcpp-ddf.h
*
* NOTE: Terminology
* DDF = data-driven future
* DDT = data-driven task (a task that waits on DDF objects)
*
* Author: Vivek Kumar ([email protected])
* Ported from hclib
Expand Down Expand Up @@ -67,15 +71,17 @@ typedef enum DDF_Kind {

/**
* DDT data-structure to associate DDTs and DDFs.
* This is exposed so that the runtime know the size of the struct.
* This is exposed so that the runtime knows the size of the struct.
*/
typedef struct ddt_st {
// NULL-terminated list of DDFs the DDT is registered on
struct hclib_ddf_st ** waitingFrontier;
// This allows us to chain all DDTs waiting on a same DDF
// Whenever a DDT wants to register on a DDF, and that DDF is
// not ready, we chain the current DDT and the DDF's headDDTWaitList
// and try to cas on the DDF's headDDTWaitList, with the current DDT.
/*
* This allows us to chain all DDTs waiting on a same DDF. Whenever a DDT
* wants to register on a DDF, and that DDF is not ready, we chain the
* current DDT and the DDF's headDDTWaitList and try to cas on the DDF's
* headDDTWaitList, with the current DDT.
*/
struct ddt_st * nextDDTWaitingOnSameDDF;
} ddt_t;

Expand Down Expand Up @@ -132,6 +138,12 @@ void * hclib_ddf_get(hclib_ddf_t * ddf);
*/
void hclib_ddf_put(hclib_ddf_t * ddf, void * datum);

/*
* Block the currently executing task on the provided DDF. Returns the datum
* that was put on ddf.
*/
void *hclib_ddf_wait(hclib_ddf_t *ddf);

/*
* Some extras
*/
Expand Down
16 changes: 9 additions & 7 deletions inc/hcpp-rt.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,18 @@ struct hc_deque_t;
struct finish_t;

typedef struct hc_workerState {
pthread_t t; /* the pthread associated */
pthread_t t; // the pthread associated
struct finish_t* current_finish;
struct place_t * pl; /* the directly attached place */
struct place_t ** hpt_path; /* Path from root to worker's leaf place. Array of places. */
struct place_t * pl; // the directly attached place
// Path from root to worker's leaf place. Array of places.
struct place_t ** hpt_path;
struct hc_context * context;
struct hc_workerState * next_worker; /* the link of other ws in the same place */
struct hc_deque_t * current; /* the current deque/place worker is on */
// the link of other ws in the same place
struct hc_workerState * next_worker;
struct hc_deque_t * current; // the current deque/place worker is on
struct hc_deque_t * deques;
int id; /* The id, identify a worker */
int did; /* the mapping device id */
int id; // The id, identify a worker
int did; // the mapping device id
LiteCtx *curr_ctx;
LiteCtx *root_ctx;
} hc_workerState;
Expand Down
24 changes: 24 additions & 0 deletions src/hclib.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,30 @@ void hclib_async(generic_framePtr fp, void *arg, hclib_ddf_t** ddf_list,
}
}

typedef struct _future_args_wrapper {
hclib_ddf_t event;
futureFct_t fp;
void *actual_in;
} future_args_wrapper;

static void future_caller(void *in) {
future_args_wrapper *args = in;
void *user_result = (args->fp)(args->actual_in);
hclib_ddf_put(&args->event, user_result);
}

hclib_ddf_t *hclib_async_future(futureFct_t fp, void *arg,
hclib_ddf_t** ddf_list, struct _phased_t * phased_clause,
int property) {
future_args_wrapper *wrapper = malloc(sizeof(future_args_wrapper));
hclib_ddf_init(&wrapper->event);
wrapper->fp = fp;
wrapper->actual_in = arg;
hclib_async(future_caller, wrapper, ddf_list, phased_clause, property);

return (hclib_ddf_t *)wrapper;
}

/*** END ASYNC IMPLEMENTATION ***/

/*** START FORASYNC IMPLEMENTATION ***/
Expand Down
4 changes: 4 additions & 0 deletions src/hclib_cpp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ int hclib::get_num_places(hclib::place_type_t type) {
void hclib::get_places(hclib::place_t **pls, hclib::place_type_t type) {
hclib_get_places(pls, type);
}

void *hclib::ddf_wait(hclib::ddf_t *ddf) {
return hclib_ddf_wait(ddf);
}
85 changes: 45 additions & 40 deletions src/hcpp-ddf.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// For 'headDDTWaitList' when a DDF has been satisfied
#define DDF_SATISFIED NULL

// Default value of a DDF datum
// #define UNINITIALIZED_DDF_DATA_PTR NULL
#define UNINITIALIZED_DDF_DATA_PTR ((void *) -1)

// For waiting frontier (last element of the list)
#define UNINITIALIZED_DDF_WAITLIST_PTR ((ddt_t *) -1)
#define EMPTY_DDF_WAITLIST_PTR NULL
Expand Down Expand Up @@ -146,15 +142,22 @@ __inline__ int __registerIfDDFnotReady_AND(ddt_t* wrapperTask,
/* waitListOfDDF can not be EMPTY_DDF_WAITLIST_PTR in here*/
wrapperTask->nextDDTWaitingOnSameDDF = waitListOfDDF;

success = __sync_bool_compare_and_swap(&(ddfToCheck -> headDDTWaitList), waitListOfDDF, wrapperTask);
success = __sync_bool_compare_and_swap(
&(ddfToCheck -> headDDTWaitList), waitListOfDDF,
wrapperTask);
/* printf("task:%p registered to DDF:%p\n", pollingTask,ddfToCheck); */

/* may have failed because either some other task tried to be the head or a put occurred. */
if ( !success ) {
waitListOfDDF = ( ddt_t* ) ddfToCheck -> headDDTWaitList;
/* if waitListOfDDF was set to EMPTY_DDF_WAITLIST_PTR, the loop condition will handle that
* if another task was added, now try to add in front of that
* */
/*
* may have failed because either some other task tried to be the
* head or a put occurred.
*/
if (!success) {
waitListOfDDF = (ddt_t*)ddfToCheck->headDDTWaitList;
/*
* if waitListOfDDF was set to EMPTY_DDF_WAITLIST_PTR, the loop
* condition will handle that if another task was added, now try
* to add in front of that
*/
}
}

Expand Down Expand Up @@ -197,37 +200,39 @@ ddt_t * rt_async_task_to_ddt(task_t * async_task) {
* DDF's frontier to try to advance DDTs that were waiting on this DDF.
*/
void hclib_ddf_put(hclib_ddf_t* ddfToBePut, void * datumToBePut) {
HASSERT (datumToBePut != UNINITIALIZED_DDF_DATA_PTR && EMPTY_DATUM_ERROR_MSG);
HASSERT (ddfToBePut != NULL && "can not put into NULL DDF");
HASSERT (ddfToBePut-> datum == UNINITIALIZED_DDF_DATA_PTR && "violated single assignment property for DDFs");

volatile ddt_t* waitListOfDDF = NULL;
ddt_t* currDDT = NULL;
ddt_t* nextDDT = NULL;

ddfToBePut-> datum = datumToBePut;
waitListOfDDF = ddfToBePut->headDDTWaitList;
/*seems like I can not avoid a CAS here*/
while ( !__sync_bool_compare_and_swap( &(ddfToBePut -> headDDTWaitList), waitListOfDDF, EMPTY_DDF_WAITLIST_PTR)) {
waitListOfDDF = ddfToBePut -> headDDTWaitList;
}
HASSERT (datumToBePut != UNINITIALIZED_DDF_DATA_PTR && EMPTY_DATUM_ERROR_MSG);
HASSERT (ddfToBePut != NULL && "can not put into NULL DDF");
HASSERT (ddfToBePut-> datum == UNINITIALIZED_DDF_DATA_PTR &&
"violated single assignment property for DDFs");

volatile ddt_t* waitListOfDDF = NULL;
ddt_t* currDDT = NULL;
ddt_t* nextDDT = NULL;

ddfToBePut-> datum = datumToBePut;
waitListOfDDF = ddfToBePut->headDDTWaitList;
/*seems like I can not avoid a CAS here*/
while (!__sync_bool_compare_and_swap( &(ddfToBePut -> headDDTWaitList),
waitListOfDDF, EMPTY_DDF_WAITLIST_PTR)) {
waitListOfDDF = ddfToBePut -> headDDTWaitList;
}

currDDT = (ddt_t*)waitListOfDDF;
currDDT = (ddt_t*)waitListOfDDF;

int iter_count = 0;
/* printf("DDF:%p was put:%p with value:%d\n", ddfToBePut, datumToBePut,*((int*)datumToBePut)); */
while (currDDT != UNINITIALIZED_DDF_WAITLIST_PTR) {

nextDDT = currDDT->nextDDTWaitingOnSameDDF;
if (iterate_ddt_frontier(currDDT) ) {
/* printf("pushed:%p\n", currDDT); */
/*deque_push_default(currFrame);*/
// DDT eligible to scheduling
task_t * async_task = rt_ddt_to_async_task(currDDT);
if (DEBUG_DDF) { printf("ddf: async_task %p\n", async_task); }
try_schedule_async(async_task, 0);
}
currDDT = nextDDT;
/* printf("DDF:%p was put:%p with value:%d\n", ddfToBePut, datumToBePut,*((int*)datumToBePut)); */
while (currDDT != UNINITIALIZED_DDF_WAITLIST_PTR) {

nextDDT = currDDT->nextDDTWaitingOnSameDDF;
if (iterate_ddt_frontier(currDDT)) {
/* printf("pushed:%p\n", currDDT); */
/*deque_push_default(currFrame);*/
// DDT eligible to scheduling
task_t * async_task = rt_ddt_to_async_task(currDDT);
if (DEBUG_DDF) { printf("ddf: async_task %p\n", async_task); }
try_schedule_async(async_task, 0);
}
currDDT = nextDDT;
iter_count++;
}
}
}
Loading