Skip to content

Commit

Permalink
Switch out remaining Mercury components
Browse files Browse the repository at this point in the history
Signed-off-by: Quincey Koziol <[email protected]>
  • Loading branch information
qkoziol committed Jul 2, 2024
1 parent 7e37961 commit 25cf7b6
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 25 deletions.
4 changes: 0 additions & 4 deletions src/H5FDsubfiling/H5FDioc_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@

#include "H5subfiling_common.h"

#include "mercury_thread.h"
#include "mercury_thread_pool.h"

/*
* Some definitions for debugging the IOC VFD
*/
Expand Down Expand Up @@ -206,7 +203,6 @@ typedef struct ioc_io_queue_entry {
uint32_t counter;

sf_work_request_t wk_req;
struct hg_thread_work thread_wk;
int wk_ret;

/* statistics */
Expand Down
39 changes: 18 additions & 21 deletions src/H5FDsubfiling/H5FDioc_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
*/
typedef struct ioc_data_t {
ioc_io_queue_t io_queue;
hg_thread_t ioc_main_thread;
hg_thread_pool_t *io_thread_pool;
H5TS_thread_t ioc_main_thread;
H5TS_pool_t *io_thread_pool;
int64_t sf_context_id;

H5TS_atomic_int_t sf_ioc_ready;
Expand All @@ -56,7 +56,7 @@ static double sf_queue_delay_time = 0.0;
#endif

/* Prototypes */
static HG_THREAD_RETURN_TYPE H5FD__ioc_thread_main(void *arg);
static H5TS_THREAD_RETURN_TYPE H5FD__ioc_thread_main(void *arg);

static int H5FD__ioc_file_queue_write_indep(sf_work_request_t *msg, int ioc_idx, int source, MPI_Comm comm,
uint32_t counter);
Expand Down Expand Up @@ -138,11 +138,11 @@ H5FD__ioc_init_threads(void *_sf_context)
}

/* Initialize a thread pool for the I/O concentrator's worker threads */
if (hg_thread_pool_init(thread_pool_size, &ioc_data->io_thread_pool) < 0)
if (H5TS_pool_create(&ioc_data->io_thread_pool, thread_pool_size) < 0)
HGOTO_ERROR(H5E_VFL, H5E_CANTINIT, FAIL, "can't initialize IOC worker thread pool");

/* Create the main IOC thread that will receive and dispatch I/O requests */
if (hg_thread_create(&ioc_data->ioc_main_thread, H5FD__ioc_thread_main, ioc_data) < 0)
if (H5TS_thread_create(&ioc_data->ioc_main_thread, H5FD__ioc_thread_main, ioc_data) < 0)
HGOTO_ERROR(H5E_VFL, H5E_CANTINIT, FAIL, "can't create IOC main thread");

/* Wait until H5FD__ioc_thread_main() reports that it is ready */
Expand Down Expand Up @@ -191,12 +191,12 @@ H5FD__ioc_finalize_threads(void *_sf_context)

/* Tear down IOC worker thread pool */
assert(0 == H5TS_atomic_load_int(&ioc_data->sf_io_ops_pending));
hg_thread_pool_destroy(ioc_data->io_thread_pool);
H5TS_pool_destroy(ioc_data->io_thread_pool);

H5TS_mutex_destroy(&ioc_data->io_queue.q_mutex);

/* Wait for IOC main thread to exit */
hg_thread_join(ioc_data->ioc_main_thread);
H5TS_thread_join(ioc_data->ioc_main_thread, NULL);

/* Destroy atomic vars */
H5TS_atomic_destroy_int(&ioc_data->sf_ioc_ready);
Expand Down Expand Up @@ -273,14 +273,14 @@ H5FD__ioc_finalize_threads(void *_sf_context)
*
*-------------------------------------------------------------------------
*/
static HG_THREAD_RETURN_TYPE
static H5TS_THREAD_RETURN_TYPE
H5FD__ioc_thread_main(void *arg)
{
ioc_data_t *ioc_data = (ioc_data_t *)arg;
subfiling_context_t *context = NULL;
sf_work_request_t wk_req;
int shutdown_requested;
hg_thread_ret_t ret_value = (hg_thread_ret_t)SUCCEED;
H5TS_thread_ret_t ret_value = (H5TS_thread_ret_t)SUCCEED;

assert(ioc_data);

Expand Down Expand Up @@ -308,23 +308,23 @@ H5FD__ioc_thread_main(void *arg)
/* Probe for incoming work requests */
if (MPI_SUCCESS !=
(mpi_code = (MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, context->sf_msg_comm, &flag, &status))))
HGOTO_DONE((hg_thread_ret_t)FAIL);
HGOTO_DONE((H5TS_thread_ret_t)FAIL);

if (flag) {
int count;
int source = status.MPI_SOURCE;
int tag = status.MPI_TAG;

if (tag != READ_INDEP && tag != WRITE_INDEP && tag != TRUNC_OP && tag != GET_EOF_OP)
HGOTO_DONE((hg_thread_ret_t)FAIL);
HGOTO_DONE((H5TS_thread_ret_t)FAIL);

if (MPI_SUCCESS != (mpi_code = MPI_Get_count(&status, MPI_BYTE, &count)))
HGOTO_DONE((hg_thread_ret_t)FAIL);
HGOTO_DONE((H5TS_thread_ret_t)FAIL);

if (count < 0)
HGOTO_DONE((hg_thread_ret_t)FAIL);
HGOTO_DONE((H5TS_thread_ret_t)FAIL);
if ((size_t)count > sizeof(sf_work_request_t))
HGOTO_DONE((hg_thread_ret_t)FAIL);
HGOTO_DONE((H5TS_thread_ret_t)FAIL);

/*
* Zero out work request, since the received message should
Expand All @@ -333,7 +333,7 @@ H5FD__ioc_thread_main(void *arg)
memset(&wk_req, 0, sizeof(sf_work_request_t));
if (MPI_SUCCESS != (mpi_code = MPI_Recv(&wk_req, count, MPI_BYTE, source, tag,
context->sf_msg_comm, MPI_STATUS_IGNORE)))
HGOTO_DONE((hg_thread_ret_t)FAIL);
HGOTO_DONE((H5TS_thread_ret_t)FAIL);

/* Dispatch work request to worker threads in thread pool */

Expand Down Expand Up @@ -419,7 +419,7 @@ translate_opcode(io_op_t op)
*
*-------------------------------------------------------------------------
*/
static HG_THREAD_RETURN_TYPE
static H5TS_THREAD_RETURN_TYPE
H5FD__ioc_handle_work_request(void *arg)
{
ioc_io_queue_entry_t *q_entry_ptr = (ioc_io_queue_entry_t *)arg;
Expand All @@ -428,7 +428,7 @@ H5FD__ioc_handle_work_request(void *arg)
ioc_data_t *ioc_data = NULL;
int64_t file_context_id = msg->context_id;
int op_ret;
hg_thread_ret_t ret_value = 0;
H5TS_thread_ret_t ret_value = 0;

assert(q_entry_ptr);
assert(q_entry_ptr->magic == H5FD_IOC__IO_Q_ENTRY_MAGIC);
Expand Down Expand Up @@ -1366,9 +1366,6 @@ H5FD__ioc_io_queue_dispatch_eligible_entries(ioc_data_t *ioc_data, bool try_lock
assert(ioc_data->io_queue.num_pending + ioc_data->io_queue.num_in_progress ==
ioc_data->io_queue.q_len);

entry_ptr->thread_wk.func = H5FD__ioc_handle_work_request;
entry_ptr->thread_wk.args = entry_ptr;

#ifdef H5_SUBFILING_DEBUG
H5FD__subfiling_log(
entry_ptr->wk_req.context_id,
Expand All @@ -1389,7 +1386,7 @@ H5FD__ioc_io_queue_dispatch_eligible_entries(ioc_data_t *ioc_data, bool try_lock
entry_ptr->dispatch_time = H5_now_usec();
#endif

hg_thread_pool_post(ioc_data->io_thread_pool, &(entry_ptr->thread_wk));
H5TS_pool_add_task(ioc_data->io_thread_pool, H5FD__ioc_handle_work_request, entry_ptr);
}
}

Expand Down

0 comments on commit 25cf7b6

Please sign in to comment.