Skip to content

Commit

Permalink
threadpool: API changes
Browse files Browse the repository at this point in the history
- add hooks: on thread start, on thread stop;
- add thread local storage API: tpt_tls_set(), tpt_tls_get()/tpt_tls_get_sz();
- allow private virtual thread to process messages;
- rename: tp_thread_get_current()->tpt_get_current(), tp_thread_get_cpu_id()->tpt_get_cpu_id(), tp_thread_get_num()->tpt_get_num();
- tp_thread_get() will return NULL for any invalid index istead of last thread in pool;
  • Loading branch information
rozhuk-im committed May 1, 2024
1 parent 41326c3 commit 482bf98
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 107 deletions.
30 changes: 22 additions & 8 deletions include/threadpool/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ typedef struct thread_pool_event_s { /* Thread pool event. */
static const char *tp_ff_time_units[] = { "s", "ms", "us", "ns", NULL };


typedef void (*tpt_hook_cb)(tpt_p tpt);
typedef void (*tp_cb)(tp_event_p ev, tp_udata_p tp_udata);

typedef struct thread_pool_udata_s { /* Thread pool ident and opaque user data. */
Expand Down Expand Up @@ -119,6 +120,9 @@ typedef struct thread_pool_settings_s { /* Settings. */
uint32_t flags; /* TP_S_F_* */
size_t threads_max;
char name[TP_NAME_SIZE]; /* Thread pool name. Used as prefix for threads names. */
tpt_hook_cb tpt_on_start; /* Called by every thread before enter event loop. Can be used with tpt_tls_*() */
tpt_hook_cb tpt_on_stop; /* Called by every thread after exit from event loop, before destroy. */
void *udata; /* Thread pool assosiated user data. See tp_udata_get(). Useful with tpt hooks. */
} tp_settings_t, *tp_settings_p;

#define TP_S_F_BIND2CPU (((uint32_t)1) << 0) /* Bind threads to CPUs. */
Expand Down Expand Up @@ -152,27 +156,37 @@ void tp_shutdown(tp_p tp);
int tp_shutdown_wait(tp_p tp); /* Wait for all threads before return. */
int tp_destroy(tp_p tp);

int tp_threads_create(tp_p tp, int skip_first);
/* Set/get thread pool assosiated user data. On create - set from settings */
int tp_udata_set(tp_p tp, void *udata);
void *tp_udata_get(tp_p tp);

int tp_threads_create(tp_p tp, const int skip_first);
int tp_thread_attach_first(tp_p tp);
int tp_thread_dettach(tpt_p tpt);
size_t tp_thread_count_max_get(tp_p tp);
size_t tp_thread_count_get(tp_p tp);

/* Return tpt_p if caller thread is thread pool thread. */
tpt_p tp_thread_get_current(void);
/* Return non zero if tpt is one of tp threads.
* If tpt is NULL - tp_thread_get_current() used to get current thread tpt. */
* If tpt is NULL - tpt_get_current() used to get current thread tpt. */
int tp_thread_is_tp_thr(tp_p tp, tpt_p tpt);
tpt_p tp_thread_get(tp_p tp, size_t thread_num);
tpt_p tp_thread_get(tp_p tp, const size_t thread_num);
tpt_p tp_thread_get_rr(tp_p tp);
tpt_p tp_thread_get_pvt(tp_p tp); /* Shared virtual thread. */
int tp_thread_get_cpu_id(tpt_p tpt);
size_t tp_thread_get_num(tpt_p tpt);

/* Return tpt_p if caller thread is thread pool thread. */
tpt_p tpt_get_current(void);
int tpt_get_cpu_id(tpt_p tpt);
size_t tpt_get_num(tpt_p tpt);
tp_p tpt_get_tp(tpt_p tpt);
int tpt_is_running(tpt_p tpt);
void *tpt_get_msg_queue(tpt_p tpt);

/* Thread pool thread local storage (TLS). */
#ifndef TP_TPT_TLS_COUNT
# define TP_TPT_TLS_COUNT 2
#endif
int tpt_tls_set(tpt_p tpt, const size_t index, void *val);
void *tpt_tls_get(tpt_p tpt, const size_t index);
size_t tpt_tls_get_sz(tpt_p tpt, const size_t index); /* Same as tpt_tls_get(). */


int tpt_ev_add(tpt_p tpt, tp_event_p ev, tp_udata_p tp_udata);
Expand Down
11 changes: 7 additions & 4 deletions include/utils/macro.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
#include <inttypes.h>
#include <pthread.h>
#include <syslog.h>
#ifdef _MSC_VER
# include <intrin.h>
#else
# include <signal.h>
#endif

#ifndef IOV_MAX
# include <limits.h>
Expand Down Expand Up @@ -151,15 +156,13 @@


__attribute__((gnu_inline, always_inline))
static inline int
static inline void
debug_break(void) {
#ifdef _MSC_VER
# include <intrin.h>
__debugbreak();
#elif 1
#elif 0
__builtin_trap();
#else
# include <signal.h>
raise(SIGTRAP);
#endif
}
Expand Down
22 changes: 11 additions & 11 deletions src/proto/radius_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ radius_client_destroy_tpt_msg_cb(tpt_p tpt, void *udata) {
radius_cli_thr_p thr;
size_t i;

thr = &rad_cli->thr[tp_thread_get_num(tpt)];
thr = &rad_cli->thr[tpt_get_num(tpt)];

if (NULL != thr->skts4.skt) {
for (i = 0; i < thr->skts4.skt_count; i ++) {
Expand Down Expand Up @@ -523,7 +523,7 @@ radius_client_socket_alloc(uint16_t family, radius_cli_thr_p thr) {

if (NULL == thr)
return (EINVAL);
if (tp_thread_get_current() != thr->tpt) {
if (tpt_get_current() != thr->tpt) {
syslog(LOG_DEBUG, "tpt MISSMATCH!!!!!!!!!");
return (0);
}
Expand Down Expand Up @@ -578,7 +578,7 @@ radius_client_socket_free(radius_cli_skt_p skt) {
if (NULL == skt)
return;
tpt = skt->thr->tpt;
if (tp_thread_get_current() != tpt) {
if (tpt_get_current() != tpt) {
syslog(LOG_DEBUG, "tpt MISSMATCH!!!!!!!!!");
return;
}
Expand Down Expand Up @@ -708,7 +708,7 @@ radius_client_query_done_tpt_msg_cb(tpt_p tpt, void *udata) {
radius_cli_query_p query = (radius_cli_query_p)udata;

SYSLOGD_EX(LOG_DEBUG, "...");
if (tp_thread_get_current() != tpt) {
if (tpt_get_current() != tpt) {
syslog(LOG_DEBUG, "tpt MISSMATCH!!!!!!!!!");
return;
}
Expand Down Expand Up @@ -739,7 +739,7 @@ radius_client_query(radius_cli_p rad_cli, tpt_p tpt, size_t query_id,
if (0 != error)
return (error);
/* Switch thread. */
//if (tp_thread_get_current() == tpt) { /* No need to shedule, direct call cb. */
//if (tpt_get_current() == tpt) { /* No need to shedule, direct call cb. */
// radius_client_query_tpt_msg_cb(tpt, query);
//} else {
/* Try send to thread message for server "connections" terminate. */
Expand All @@ -761,7 +761,7 @@ radius_client_query_tpt_msg_cb(tpt_p tpt, void *udata) {
int error;
radius_cli_query_p query = (radius_cli_query_p)udata;

if (tp_thread_get_current() != tpt) {
if (tpt_get_current() != tpt) {
syslog(LOG_DEBUG, "tpt MISSMATCH!!!!!!!!!");
return;
}
Expand Down Expand Up @@ -795,7 +795,7 @@ radius_client_send_new(tpt_p tpt, radius_cli_query_p query) {
SYSLOGD_EX(LOG_DEBUG, "...");
if (NULL == query)
return (EINVAL);
if (tp_thread_get_current() != tpt) {
if (tpt_get_current() != tpt) {
syslog(LOG_DEBUG, "tpt MISSMATCH!!!!!!!!!");
return (0);
}
Expand All @@ -817,7 +817,7 @@ radius_client_send_new(tpt_p tpt, radius_cli_query_p query) {
MTX_UNLOCK(&rad_cli->cli_srv_mtx);
if (NULL == srv)
return (ECONNREFUSED);
thr = &rad_cli->thr[tp_thread_get_num(tpt)];
thr = &rad_cli->thr[tpt_get_num(tpt)];
skts = ((AF_INET == srv->s.addr.ss_family) ? &thr->skts4 : &thr->skts6);

if (NULL != query->skt && skts == query->skt->skts)
Expand Down Expand Up @@ -912,7 +912,7 @@ radius_client_send(radius_cli_query_p query) {
SYSLOGD_EX(LOG_DEBUG, "...");
if (NULL == query)
return (EINVAL);
if (tp_thread_get_current() != query->skt->thr->tpt) {
if (tpt_get_current() != query->skt->thr->tpt) {
syslog(LOG_DEBUG, "tpt MISSMATCH!!!!!!!!!");
return (0);
}
Expand Down Expand Up @@ -943,7 +943,7 @@ radius_client_query_timeout_cb(tp_event_p ev __unused, tp_udata_p tp_udata) {
tpt_ev_enable_args1(0, TP_EV_TIMER, tp_udata);
if (NULL == query) /* Task already done/removed. */
return;
if (tp_thread_get_current() != query->skt->thr->tpt) {
if (tpt_get_current() != query->skt->thr->tpt) {
syslog(LOG_DEBUG, "tpt MISSMATCH!!!!!!!!!");
return;
}
Expand Down Expand Up @@ -1002,7 +1002,7 @@ radius_client_recv_cb(tp_task_p tptask __unused, int error,
rad_pkt_hdr_p pkt;

SYSLOGD_EX(LOG_DEBUG, "...");
if (tp_thread_get_current() != skt->thr->tpt) {
if (tpt_get_current() != skt->thr->tpt) {
syslog(LOG_DEBUG, "tpt MISSMATCH!!!!!!!!!");
goto rcv_next;
}
Expand Down
Loading

0 comments on commit 482bf98

Please sign in to comment.