Skip to content

Commit

Permalink
Add test for thread pools
Browse files Browse the repository at this point in the history
Also other minor cleanups

Signed-off-by: Quincey Koziol <[email protected]>
  • Loading branch information
qkoziol committed Mar 21, 2024
1 parent 7c40447 commit f79f894
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 29 deletions.
26 changes: 13 additions & 13 deletions src/H5TSpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ H5TS__pool_free(H5TS_pool_t *pool)
HGOTO_DONE(FAIL);

/* Destroy the pool's mutex and condition variable */
if (H5_UNLIKELY(H5TS_mutex_destroy(&pool->mutex)))
if (H5_UNLIKELY(H5TS_mutex_destroy(&pool->mutex) < 0))
HGOTO_DONE(FAIL);
if (H5_UNLIKELY(H5TS_cond_destroy(&pool->cond)))
if (H5_UNLIKELY(H5TS_cond_destroy(&pool->cond) < 0))
HGOTO_DONE(FAIL);

/* Release memory */
Expand Down Expand Up @@ -167,13 +167,13 @@ H5TS__pool_do(void *_pool)
/* Acquire tasks and invoke them, until pool is shut down */
while (1) {
/* Acquire the mutex for the pool */
if (H5_UNLIKELY(H5TS_mutex_lock(&pool->mutex)))
if (H5_UNLIKELY(H5TS_mutex_lock(&pool->mutex) < 0))
HGOTO_DONE((H5TS_thread_ret_t)1);
have_mutex = true;

/* If queue is empty and pool is not shutting down, wait for a task */
while (NULL == pool->head && !pool->shutdown)
if (H5_UNLIKELY(H5TS_cond_wait(&pool->cond, &pool->mutex)))
if (H5_UNLIKELY(H5TS_cond_wait(&pool->cond, &pool->mutex) < 0))
HGOTO_DONE((H5TS_thread_ret_t)1);

/* If there's a task, invoke it, else we're shutting down */
Expand All @@ -188,7 +188,7 @@ H5TS__pool_do(void *_pool)
pool->head = pool->tail = NULL;

/* Release the pool's mutex */
if (H5_UNLIKELY(H5TS_mutex_unlock(&pool->mutex)))
if (H5_UNLIKELY(H5TS_mutex_unlock(&pool->mutex) < 0))
HGOTO_DONE((H5TS_thread_ret_t)1);
have_mutex = false;

Expand All @@ -207,7 +207,7 @@ H5TS__pool_do(void *_pool)
done:
/* Release the pool's mutex, if we're holding it */
if (H5_UNLIKELY(have_mutex))
if (H5_UNLIKELY(H5TS_mutex_unlock(&pool->mutex)))
if (H5_UNLIKELY(H5TS_mutex_unlock(&pool->mutex) < 0))
ret_value = (H5TS_thread_ret_t)1;

FUNC_LEAVE_NOAPI_NAMECHECK_ONLY(ret_value)
Expand Down Expand Up @@ -309,7 +309,7 @@ H5TS_pool_add_task(H5TS_pool_t *pool, H5TS_thread_start_func_t func, void *ctx)
task->next = NULL;

/* Acquire the mutex for the pool */
if (H5_UNLIKELY(H5TS_mutex_lock(&pool->mutex)))
if (H5_UNLIKELY(H5TS_mutex_lock(&pool->mutex) < 0))
HGOTO_DONE(FAIL);
have_mutex = true;

Expand All @@ -329,13 +329,13 @@ H5TS_pool_add_task(H5TS_pool_t *pool, H5TS_thread_start_func_t func, void *ctx)
task = NULL;

/* Wake up any sleeping worker */
if (H5_UNLIKELY(H5TS_cond_signal(&pool->cond)))
if (H5_UNLIKELY(H5TS_cond_signal(&pool->cond) < 0))
HGOTO_DONE(FAIL);

done:
/* Release the pool's mutex, if we're holding it */
if (H5_LIKELY(have_mutex))
if (H5_UNLIKELY(H5TS_mutex_unlock(&pool->mutex)))
if (H5_UNLIKELY(H5TS_mutex_unlock(&pool->mutex) < 0))
ret_value = FAIL;

if (H5_UNLIKELY(ret_value < 0))
Expand Down Expand Up @@ -367,19 +367,19 @@ H5TS_pool_destroy(H5TS_pool_t *pool)
HGOTO_DONE(FAIL);

/* Acquire the mutex for the pool */
if (H5_UNLIKELY(H5TS_mutex_lock(&pool->mutex)))
if (H5_UNLIKELY(H5TS_mutex_lock(&pool->mutex) < 0))
HGOTO_DONE(FAIL);
have_mutex = true;

/* Tell any existing threads that the pool is shutting down */
pool->shutdown = true;

/* Wake all the worker threads up */
if (H5_UNLIKELY(H5TS_cond_broadcast(&pool->cond)))
if (H5_UNLIKELY(H5TS_cond_broadcast(&pool->cond) < 0))
HGOTO_DONE(FAIL);

/* Release the pool's mutex */
if (H5_UNLIKELY(H5TS_mutex_unlock(&pool->mutex)))
if (H5_UNLIKELY(H5TS_mutex_unlock(&pool->mutex) < 0))
HGOTO_DONE(FAIL);
have_mutex = false;

Expand All @@ -390,7 +390,7 @@ H5TS_pool_destroy(H5TS_pool_t *pool)
done:
/* Release the pool's mutex, if we're holding it */
if (H5_UNLIKELY(have_mutex))
if (H5_UNLIKELY(H5TS_mutex_unlock(&pool->mutex)))
if (H5_UNLIKELY(H5TS_mutex_unlock(&pool->mutex) < 0))
ret_value = FAIL;

FUNC_LEAVE_NOAPI_NAMECHECK_ONLY(ret_value)
Expand Down
6 changes: 3 additions & 3 deletions src/H5TSprivate.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@
/* Key destructor callback */
typedef void (*H5TS_key_destructor_func_t)(void *);

/* Thread pool */
typedef struct H5TS_pool_t H5TS_pool_t;

/* Portability aliases */
#ifdef H5_HAVE_WIN_THREADS
typedef HANDLE H5TS_thread_t;
Expand All @@ -96,9 +99,6 @@ typedef pthread_once_t H5TS_once_t;
typedef void (*H5TS_once_init_func_t)(void);
#endif

/* Thread pool */
typedef struct H5TS_pool_t H5TS_pool_t;

/*****************************/
/* Library-private Variables */
/*****************************/
Expand Down
10 changes: 1 addition & 9 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ set (ttsafe_SOURCES
${HDF5_TEST_SOURCE_DIR}/ttsafe_error.c
${HDF5_TEST_SOURCE_DIR}/ttsafe_rec_rw_lock.c
${HDF5_TEST_SOURCE_DIR}/ttsafe_thread_id.c
${HDF5_TEST_SOURCE_DIR}/ttsafe_thread_pool.c
)

set (H5_TESTS
Expand Down Expand Up @@ -423,7 +424,6 @@ set (H5_TESTS
cache_logging
cork
swmr
thread_id # special link
vol
timer
cmpd_dtransform
Expand Down Expand Up @@ -462,7 +462,6 @@ set (H5_TESTS_MULTIPLE
testhdf5
cache_image
ttsafe
thread_id # special link
mirror_vfd
)
# Only build single source tests here
Expand Down Expand Up @@ -585,13 +584,6 @@ if (HDF5_ENABLE_FORMATTERS)
clang_format (HDF5_TEST_ttsafe_FORMAT ttsafe)
endif ()

#-----------------------------------------------------------------------------
# Add Target to clang-format
#-----------------------------------------------------------------------------
if (HDF5_ENABLE_FORMATTERS)
clang_format (HDF5_TEST_thread_id_FORMAT thread_id)
endif ()

if (HDF5_BUILD_UTILS) # requires mirror server
#-- Adding test for mirror_vfd
add_executable (mirror_vfd ${mirror_vfd_SOURCES})
Expand Down
2 changes: 1 addition & 1 deletion test/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ LDADD=libh5test.la $(LIBHDF5)
# List the source files for tests that have more than one
ttsafe_SOURCES=ttsafe.c ttsafe_acreate.c ttsafe_attr_vlen.c ttsafe_cancel.c \
ttsafe_dcreate.c ttsafe_develop.c ttsafe_error.c ttsafe_rec_rw_lock.c \
ttsafe_thread_id.c
ttsafe_thread_id.c ttsafe_thread_pool.c
cache_image_SOURCES=cache_image.c genall5.c
mirror_vfd_SOURCES=mirror_vfd.c genall5.c

Expand Down
1 change: 1 addition & 0 deletions test/ttsafe.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ main(int argc, char *argv[])
AddTest("is_threadsafe", tts_is_threadsafe, NULL, "library threadsafe status", NULL);
#ifdef H5_HAVE_THREADSAFE
AddTest("thread_id", tts_thread_id, NULL, "thread IDs", NULL);
AddTest("thread_pool", tts_thread_pool, NULL, "thread pools", NULL);

AddTest("dcreate", tts_dcreate, cleanup_dcreate, "multi-dataset creation", NULL);
AddTest("error", tts_error, cleanup_error, "per-thread error stacks", NULL);
Expand Down
1 change: 1 addition & 0 deletions test/ttsafe.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ void tts_rec_rw_lock_smoke_check_2(void);
void tts_rec_rw_lock_smoke_check_3(void);
void tts_rec_rw_lock_smoke_check_4(void);
void tts_develop_api(void);
void tts_thread_pool(void);

/* Prototypes for the cleanup routines */
void cleanup_dcreate(void);
Expand Down
21 changes: 18 additions & 3 deletions test/ttsafe_rec_rw_lock.c
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,9 @@ tts_rec_rw_lock_smoke_check_2(void)
udata[i].max_recursive_lock_depth = 10;
}

#if H5TS_ENABLE_REC_RW_LOCK_STATS
uint64_t start_time = H5_now_usec();
#endif
/* 3) Create the reader threads, each with its own user data. */
for (i = 0; i < num_threads; i++)
if (H5TS_thread_create(&threads[i], tts_rw_lock_smoke_check_test_thread, &udata[i]) < 0)
Expand All @@ -617,10 +619,13 @@ tts_rec_rw_lock_smoke_check_2(void)
for (i = 0; i < num_threads; i++)
if (H5TS_thread_join(threads[i], NULL) < 0)
TestErrPrintf("thread %d failed to join", i);
#if H5TS_ENABLE_REC_RW_LOCK_STATS
uint64_t end_time = H5_now_usec();
uint64_t elap_time = (unsigned long long)(end_time - start_time);
fprintf(stdout, "elapsed usec: %llu, usec per lock_cycle = %llu\n", elap_time,
if (verbose)
fprintf(stdout, "elapsed usec: %llu, usec per lock_cycle = %llu\n", elap_time,
(elap_time / (uint64_t)lock_cycles));
#endif

/* 5) Examine the user data from the threads, to determine the
* total number of real and recursive read locks and unlocks.
Expand Down Expand Up @@ -808,7 +813,9 @@ tts_rec_rw_lock_smoke_check_3(void)
udata[i].max_recursive_lock_depth = 10;
}

#if H5TS_ENABLE_REC_RW_LOCK_STATS
uint64_t start_time = H5_now_usec();
#endif
/* 3) Create the writer threads, each with its own user data. */
for (i = 0; i < num_threads; i++)
if (H5TS_thread_create(&threads[i], tts_rw_lock_smoke_check_test_thread, &udata[i]) < 0)
Expand All @@ -818,10 +825,13 @@ tts_rec_rw_lock_smoke_check_3(void)
for (i = 0; i < num_threads; i++)
if (H5TS_thread_join(threads[i], NULL) < 0)
TestErrPrintf("thread %d failed to join", i);
#if H5TS_ENABLE_REC_RW_LOCK_STATS
uint64_t end_time = H5_now_usec();
uint64_t elap_time = (unsigned long long)(end_time - start_time);
fprintf(stdout, "elapsed usec: %llu, usec per lock_cycle = %llu\n", elap_time,
if (verbose)
fprintf(stdout, "elapsed usec: %llu, usec per lock_cycle = %llu\n", elap_time,
(elap_time / (uint64_t)lock_cycles));
#endif

/* 5) Examine the user data from the threads, to determine the
* total number of real and recursive read locks and unlock.
Expand Down Expand Up @@ -1010,7 +1020,9 @@ tts_rec_rw_lock_smoke_check_4(void)
udata[i].max_recursive_lock_depth = 10;
}

#if H5TS_ENABLE_REC_RW_LOCK_STATS
uint64_t start_time = H5_now_usec();
#endif
/* 3) Create the reader threads, each with its own user data. */
for (i = 0; i < num_threads; i++)
if (H5TS_thread_create(&threads[i], tts_rw_lock_smoke_check_test_thread, &udata[i]) < 0)
Expand All @@ -1020,10 +1032,13 @@ tts_rec_rw_lock_smoke_check_4(void)
for (i = 0; i < num_threads; i++)
if (H5TS_thread_join(threads[i], NULL) < 0)
TestErrPrintf("thread %d failed to join", i);
#if H5TS_ENABLE_REC_RW_LOCK_STATS
uint64_t end_time = H5_now_usec();
uint64_t elap_time = (unsigned long long)(end_time - start_time);
fprintf(stdout, "elapsed usec: %llu, usec per lock_cycle = %llu\n", elap_time,
if (verbose)
fprintf(stdout, "elapsed usec: %llu, usec per lock_cycle = %llu\n", elap_time,
(elap_time / (uint64_t)lock_cycles));
#endif

/* 5) Examine the user data from the threads, to determine the
* total number of real and recursive read locks and unlock.
Expand Down
Loading

0 comments on commit f79f894

Please sign in to comment.