Skip to content

Commit

Permalink
Use H5TS_barrier_t in test/ttsafe_cancel.c; use malloc for thread poo…
Browse files Browse the repository at this point in the history
…l tasks

Also fix a few small memory leaks in the MPI-IO VFD and the testpar/t_vf.c code.

Signed-off-by: Quincey Koziol <[email protected]>
  • Loading branch information
qkoziol committed Mar 21, 2024
1 parent f337710 commit 681fb1e
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 70 deletions.
2 changes: 1 addition & 1 deletion src/H5FDmpio.c
Original file line number Diff line number Diff line change
Expand Up @@ -1959,7 +1959,7 @@ H5FD__mpio_vector_build_types(uint32_t count, H5FD_mem_t types[], haddr_t addrs[

done:
/* free sorted vectors if they exist */
if (!vector_was_sorted)
if (!*vector_was_sorted)
if (s_types) {
free(s_types);
s_types = NULL;
Expand Down
32 changes: 15 additions & 17 deletions src/H5TSpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ static const H5TS_pool_t H5TS_pool_def = H5TS_POOL_INIT;
/* Declare a free list to manage H5TS_pool_t structs */
H5FL_DEFINE_STATIC(H5TS_pool_t);

/* Declare a free list to manage H5TS_pool_task_t structs */
H5FL_DEFINE_STATIC(H5TS_pool_task_t);

/* Declare a free list to manage sequences of H5TS_thread_t structs */
H5FL_SEQ_DEFINE_STATIC(H5TS_thread_t);

Expand Down Expand Up @@ -181,7 +178,7 @@ H5TS__pool_do(void *_pool)

/* If there's a task, invoke it, else we're shutting down */
if (NULL != pool->head) {
H5TS_pool_task_t *task; /* Task to invoke */
H5TS_pool_task_t *task; /* Task to invoke */

/* Grab our task */
task = pool->head;
Expand All @@ -199,7 +196,7 @@ H5TS__pool_do(void *_pool)
(*task->func)(task->ctx);

/* Free the task node */
H5FL_FREE(H5TS_pool_task_t, task);
free(task);
}
else {
assert(pool->shutdown);
Expand All @@ -209,7 +206,7 @@ H5TS__pool_do(void *_pool)

done:
/* Release the pool's mutex, if we're holding it */
if (have_mutex)
if (H5_UNLIKELY(have_mutex))
if (H5_UNLIKELY(H5TS_mutex_unlock(&pool->mutex)))
ret_value = (H5TS_thread_ret_t)1;

Expand Down Expand Up @@ -252,18 +249,16 @@ H5TS_pool_create(H5TS_pool_t **pool, unsigned num_threads)

/* Start worker threads */
for (unsigned u = 0; u < num_threads; u++) {
/* Create & detach thread */
/* Create thread, which waits to process tasks until the pool is active */
if (H5_UNLIKELY(H5TS_thread_create(&new_pool->threads[u], H5TS__pool_do, (void *)new_pool) < 0))
HGOTO_DONE(FAIL);
if (H5_UNLIKELY(H5TS_thread_detach(new_pool->threads[u]) < 0))
HGOTO_DONE(FAIL);

/* Increment # of threads successfully created */
new_pool->num_threads++;
}
assert(num_threads == new_pool->num_threads);

/* Mark pool as active */
/* Mark pool as active, so that threads start processing tasks */
new_pool->active = true;

/* Set return value */
Expand Down Expand Up @@ -307,7 +302,7 @@ H5TS_pool_add_task(H5TS_pool_t *pool, H5TS_thread_start_func_t func, void *ctx)
HGOTO_DONE(FAIL);

/* Allocate & initialize new task */
if (H5_UNLIKELY(NULL == (task = H5FL_MALLOC(H5TS_pool_task_t))))
if (H5_UNLIKELY(NULL == (task = malloc(sizeof(H5TS_pool_task_t)))))
HGOTO_DONE(FAIL);
task->func = func;
task->ctx = ctx;
Expand All @@ -330,20 +325,23 @@ H5TS_pool_add_task(H5TS_pool_t *pool, H5TS_thread_start_func_t func, void *ctx)
else
pool->head = pool->tail = task;

/* Avoid freeing the task on error, now */
task = NULL;

/* Wake up any sleeping worker */
if (H5_UNLIKELY(H5TS_cond_signal(&pool->cond)))
HGOTO_DONE(FAIL);

done:
if (H5_UNLIKELY(ret_value < 0))
if (task)
H5FL_FREE(H5TS_pool_task_t, task);

/* Release the pool's mutex, if we're holding it */
if (have_mutex)
if (H5_LIKELY(have_mutex))
if (H5_UNLIKELY(H5TS_mutex_unlock(&pool->mutex)))
ret_value = FAIL;

if (H5_UNLIKELY(ret_value < 0))
if (task)
free(task);

FUNC_LEAVE_NOAPI_NAMECHECK_ONLY(ret_value)
} /* end H5TS_pool_add_task() */

Expand Down Expand Up @@ -391,7 +389,7 @@ H5TS_pool_destroy(H5TS_pool_t *pool)

done:
/* Release the pool's mutex, if we're holding it */
if (have_mutex)
if (H5_UNLIKELY(have_mutex))
if (H5_UNLIKELY(H5TS_mutex_unlock(&pool->mutex)))
ret_value = FAIL;

Expand Down
52 changes: 11 additions & 41 deletions test/ttsafe_cancel.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
* The main thread spawns a child to perform a series of dataset writes
* to a hdf5 file. The main thread and child thread synchronizes within
* a callback function called during a H5Diterate call afterwhich the
* a callback function called during a H5Diterate call after which the
* main thread attempts to cancel the child thread.
*
* The cancellation should only work after the child thread has safely
Expand Down Expand Up @@ -48,21 +48,18 @@ typedef struct cleanup_struct {
} cancel_cleanup_t;

pthread_t childthread;
pthread_mutex_t mutex;
pthread_cond_t cond;
static H5TS_barrier_t barrier;

void
tts_cancel(void)
{
hid_t dataset;
int buffer;
int H5_ATTR_NDEBUG_UNUSED ret;
int ret;

/* Initialize mutex & condition variables */
ret = pthread_mutex_init(&mutex, NULL);
assert(ret == 0);
ret = pthread_cond_init(&cond, NULL);
assert(ret == 0);
/* Initialize barrier */
ret = H5TS__barrier_init(&barrier, 2);
CHECK_I(ret, "H5TS__barrier_init");

/*
* Create a hdf5 file using H5F_ACC_TRUNC access, default file
Expand All @@ -72,7 +69,7 @@ tts_cancel(void)
assert(cancel_file >= 0);
ret = pthread_create(&childthread, NULL, tts_cancel_thread, NULL);
assert(ret == 0);
tts_cancel_barrier();
H5TS__barrier_wait(&barrier);
ret = pthread_cancel(childthread);
assert(ret == 0);

Expand All @@ -88,6 +85,9 @@ tts_cancel(void)
assert(ret >= 0);
ret = H5Fclose(cancel_file);
assert(ret >= 0);

ret = H5TS__barrier_destroy(&barrier);
CHECK_I(ret, "H5TS__barrier_destroy");
} /* end tts_cancel() */

void *
Expand Down Expand Up @@ -164,7 +164,7 @@ tts_cancel_callback(void *elem, hid_t H5_ATTR_UNUSED type_id, unsigned H5_ATTR_U
int value = *(int *)elem;
herr_t status;

tts_cancel_barrier();
H5TS__barrier_wait(&barrier);
HDsleep(3);

if (value != 1) {
Expand Down Expand Up @@ -195,38 +195,8 @@ cancellation_cleanup(void *arg)
CHECK(status, FAIL, "H5Tclose");
status = H5Sclose(cleanup_structure->dataspace);
CHECK(status, FAIL, "H5Sclose");

/* retained for debugging */
/* print_func("cancellation noted, cleaning up ... \n"); */
} /* end cancellation_cleanup() */

/*
* Artificial (and specific to this test) barrier to keep track of whether
* both the main and child threads have reached a point in the program.
*/
void
tts_cancel_barrier(void)
{
static int count = 2;
int status;

status = pthread_mutex_lock(&mutex);
VERIFY(status, 0, "pthread_mutex_lock");

if (count != 1) {
count--;
status = pthread_cond_wait(&cond, &mutex);
VERIFY(status, 0, "pthread_cond_wait");
}
else {
status = pthread_cond_signal(&cond);
VERIFY(status, 0, "pthread_cond_signal");
}

status = pthread_mutex_unlock(&mutex);
VERIFY(status, 0, "pthread_mutex_unlock");
} /* end tts_cancel_barrier() */

void
cleanup_cancel(void)
{
Expand Down
3 changes: 1 addition & 2 deletions test/ttsafe_thread_id.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#define CYCLE_COUNT 2
#define NTHREADS 5

static volatile bool failed = false;
static H5TS_barrier_t barrier;
static int times;
static bool used[NTHREADS * CYCLE_COUNT];
Expand Down Expand Up @@ -130,7 +129,7 @@ tts_thread_id(void)
}
}
result = H5TS__barrier_destroy(&barrier);
CHECK_I(result, "H5TS__barrier_init");
CHECK_I(result, "H5TS__barrier_destroy");

} /* end tts_thread_id() */

Expand Down
40 changes: 31 additions & 9 deletions testpar/t_vfd.c
Original file line number Diff line number Diff line change
Expand Up @@ -4994,6 +4994,9 @@ test_vector_io(int mpi_rank, int mpi_size)
H5FD_SUBFILING_NAME);
#endif

/* discard the file image buffers */
free_file_images();

nerrors += (int)nerrs;

/* return(nerrs);*/
Expand Down Expand Up @@ -5048,6 +5051,11 @@ test_selection_io_read_verify(hid_t dxpl, int mpi_rank, hsize_t start[], hsize_t
(void **)rbufs) < 0)
goto error;

/* Pop API context */
if (api_ctx_pushed)
H5CX_pop(false);
api_ctx_pushed = false;

/* Verify result */
for (i = 0; i < (int)rbufcount; i++) {
hsize_t endblock = MIN((start[i] + block[i]), (hsize_t)(sel_dim0 * sel_dim1));
Expand Down Expand Up @@ -5127,6 +5135,11 @@ test_selection_io_write(hid_t dxpl, H5FD_t *lf, H5FD_mem_t type, uint32_t count,
if (H5FDwrite_selection(lf, type, dxpl, count, mem_spaces, file_spaces, offsets, element_sizes, bufs) < 0)
goto error;

/* Pop API context */
if (api_ctx_pushed)
H5CX_pop(false);
api_ctx_pushed = false;

if (bufs)
free(bufs);

Expand Down Expand Up @@ -6459,18 +6472,30 @@ test_selection_io_real(int mpi_rank, int mpi_size, H5FD_t *lf, hid_t dxpl)
}

/* Free the buffers */
if (wbuf1)
if (wbuf1) {
free(wbuf1);
if (wbuf2)
wbuf1 = NULL;
}
if (wbuf2) {
free(wbuf2);
if (fbuf1)
wbuf2 = NULL;
}
if (fbuf1) {
free(fbuf1);
if (fbuf2)
fbuf1 = NULL;
}
if (fbuf2) {
free(fbuf2);
if (erbuf1)
fbuf2 = NULL;
}
if (erbuf1) {
free(erbuf1);
if (erbuf2)
erbuf1 = NULL;
}
if (erbuf2) {
free(erbuf2);
erbuf2 = NULL;
}

CHECK_PASSED();

Expand Down Expand Up @@ -6731,9 +6756,6 @@ main(int argc, char **argv)
printf("===================================\n");
}

/* discard the file image buffers */
free_file_images();

/* close HDF5 library */
H5close();

Expand Down

0 comments on commit 681fb1e

Please sign in to comment.