diff --git a/include/threadpool/threadpool.h b/include/threadpool/threadpool.h index 7e8d6af..4e15a24 100644 --- a/include/threadpool/threadpool.h +++ b/include/threadpool/threadpool.h @@ -91,6 +91,7 @@ typedef struct thread_pool_event_s { /* Thread pool event. */ static const char *tp_ff_time_units[] = { "s", "ms", "us", "ns", NULL }; +typedef void (*tpt_hook_cb)(tpt_p tpt); typedef void (*tp_cb)(tp_event_p ev, tp_udata_p tp_udata); typedef struct thread_pool_udata_s { /* Thread pool ident and opaque user data. */ @@ -119,6 +120,9 @@ typedef struct thread_pool_settings_s { /* Settings. */ uint32_t flags; /* TP_S_F_* */ size_t threads_max; char name[TP_NAME_SIZE]; /* Thread pool name. Used as prefix for threads names. */ + tpt_hook_cb tpt_on_start; /* Called by every thread before enter event loop. Can be used with tpt_tls_*() */ + tpt_hook_cb tpt_on_stop; /* Called by every thread after exit from event loop, before destroy. */ + void *udata; /* Thread pool assosiated user data. See tp_udata_get(). Useful with tpt hooks. */ } tp_settings_t, *tp_settings_p; #define TP_S_F_BIND2CPU (((uint32_t)1) << 0) /* Bind threads to CPUs. */ @@ -152,27 +156,37 @@ void tp_shutdown(tp_p tp); int tp_shutdown_wait(tp_p tp); /* Wait for all threads before return. */ int tp_destroy(tp_p tp); -int tp_threads_create(tp_p tp, int skip_first); +/* Set/get thread pool assosiated user data. On create - set from settings */ +int tp_udata_set(tp_p tp, void *udata); +void *tp_udata_get(tp_p tp); + +int tp_threads_create(tp_p tp, const int skip_first); int tp_thread_attach_first(tp_p tp); int tp_thread_dettach(tpt_p tpt); size_t tp_thread_count_max_get(tp_p tp); size_t tp_thread_count_get(tp_p tp); -/* Return tpt_p if caller thread is thread pool thread. */ -tpt_p tp_thread_get_current(void); /* Return non zero if tpt is one of tp threads. - * If tpt is NULL - tp_thread_get_current() used to get current thread tpt. */ + * If tpt is NULL - tpt_get_current() used to get current thread tpt. */ int tp_thread_is_tp_thr(tp_p tp, tpt_p tpt); -tpt_p tp_thread_get(tp_p tp, size_t thread_num); +tpt_p tp_thread_get(tp_p tp, const size_t thread_num); tpt_p tp_thread_get_rr(tp_p tp); tpt_p tp_thread_get_pvt(tp_p tp); /* Shared virtual thread. */ -int tp_thread_get_cpu_id(tpt_p tpt); -size_t tp_thread_get_num(tpt_p tpt); +/* Return tpt_p if caller thread is thread pool thread. */ +tpt_p tpt_get_current(void); +int tpt_get_cpu_id(tpt_p tpt); +size_t tpt_get_num(tpt_p tpt); tp_p tpt_get_tp(tpt_p tpt); int tpt_is_running(tpt_p tpt); void *tpt_get_msg_queue(tpt_p tpt); - +/* Thread pool thread local storage (TLS). */ +#ifndef TP_TPT_TLS_COUNT +# define TP_TPT_TLS_COUNT 2 +#endif +int tpt_tls_set(tpt_p tpt, const size_t index, void *val); +void *tpt_tls_get(tpt_p tpt, const size_t index); +size_t tpt_tls_get_sz(tpt_p tpt, const size_t index); /* Same as tpt_tls_get(). */ int tpt_ev_add(tpt_p tpt, tp_event_p ev, tp_udata_p tp_udata); diff --git a/include/utils/macro.h b/include/utils/macro.h index fbc2f45..c714e70 100644 --- a/include/utils/macro.h +++ b/include/utils/macro.h @@ -37,6 +37,11 @@ #include #include #include +#ifdef _MSC_VER +# include +#else +# include +#endif #ifndef IOV_MAX # include @@ -151,15 +156,13 @@ __attribute__((gnu_inline, always_inline)) -static inline int +static inline void debug_break(void) { #ifdef _MSC_VER -# include __debugbreak(); -#elif 1 +#elif 0 __builtin_trap(); #else -# include raise(SIGTRAP); #endif } diff --git a/src/proto/radius_client.c b/src/proto/radius_client.c index 6c85a4c..45237c2 100644 --- a/src/proto/radius_client.c +++ b/src/proto/radius_client.c @@ -436,7 +436,7 @@ radius_client_destroy_tpt_msg_cb(tpt_p tpt, void *udata) { radius_cli_thr_p thr; size_t i; - thr = &rad_cli->thr[tp_thread_get_num(tpt)]; + thr = &rad_cli->thr[tpt_get_num(tpt)]; if (NULL != thr->skts4.skt) { for (i = 0; i < thr->skts4.skt_count; i ++) { @@ -523,7 +523,7 @@ radius_client_socket_alloc(uint16_t family, radius_cli_thr_p thr) { if (NULL == thr) return (EINVAL); - if (tp_thread_get_current() != thr->tpt) { + if (tpt_get_current() != thr->tpt) { syslog(LOG_DEBUG, "tpt MISSMATCH!!!!!!!!!"); return (0); } @@ -578,7 +578,7 @@ radius_client_socket_free(radius_cli_skt_p skt) { if (NULL == skt) return; tpt = skt->thr->tpt; - if (tp_thread_get_current() != tpt) { + if (tpt_get_current() != tpt) { syslog(LOG_DEBUG, "tpt MISSMATCH!!!!!!!!!"); return; } @@ -708,7 +708,7 @@ radius_client_query_done_tpt_msg_cb(tpt_p tpt, void *udata) { radius_cli_query_p query = (radius_cli_query_p)udata; SYSLOGD_EX(LOG_DEBUG, "..."); - if (tp_thread_get_current() != tpt) { + if (tpt_get_current() != tpt) { syslog(LOG_DEBUG, "tpt MISSMATCH!!!!!!!!!"); return; } @@ -739,7 +739,7 @@ radius_client_query(radius_cli_p rad_cli, tpt_p tpt, size_t query_id, if (0 != error) return (error); /* Switch thread. */ - //if (tp_thread_get_current() == tpt) { /* No need to shedule, direct call cb. */ + //if (tpt_get_current() == tpt) { /* No need to shedule, direct call cb. */ // radius_client_query_tpt_msg_cb(tpt, query); //} else { /* Try send to thread message for server "connections" terminate. */ @@ -761,7 +761,7 @@ radius_client_query_tpt_msg_cb(tpt_p tpt, void *udata) { int error; radius_cli_query_p query = (radius_cli_query_p)udata; - if (tp_thread_get_current() != tpt) { + if (tpt_get_current() != tpt) { syslog(LOG_DEBUG, "tpt MISSMATCH!!!!!!!!!"); return; } @@ -795,7 +795,7 @@ radius_client_send_new(tpt_p tpt, radius_cli_query_p query) { SYSLOGD_EX(LOG_DEBUG, "..."); if (NULL == query) return (EINVAL); - if (tp_thread_get_current() != tpt) { + if (tpt_get_current() != tpt) { syslog(LOG_DEBUG, "tpt MISSMATCH!!!!!!!!!"); return (0); } @@ -817,7 +817,7 @@ radius_client_send_new(tpt_p tpt, radius_cli_query_p query) { MTX_UNLOCK(&rad_cli->cli_srv_mtx); if (NULL == srv) return (ECONNREFUSED); - thr = &rad_cli->thr[tp_thread_get_num(tpt)]; + thr = &rad_cli->thr[tpt_get_num(tpt)]; skts = ((AF_INET == srv->s.addr.ss_family) ? &thr->skts4 : &thr->skts6); if (NULL != query->skt && skts == query->skt->skts) @@ -912,7 +912,7 @@ radius_client_send(radius_cli_query_p query) { SYSLOGD_EX(LOG_DEBUG, "..."); if (NULL == query) return (EINVAL); - if (tp_thread_get_current() != query->skt->thr->tpt) { + if (tpt_get_current() != query->skt->thr->tpt) { syslog(LOG_DEBUG, "tpt MISSMATCH!!!!!!!!!"); return (0); } @@ -943,7 +943,7 @@ radius_client_query_timeout_cb(tp_event_p ev __unused, tp_udata_p tp_udata) { tpt_ev_enable_args1(0, TP_EV_TIMER, tp_udata); if (NULL == query) /* Task already done/removed. */ return; - if (tp_thread_get_current() != query->skt->thr->tpt) { + if (tpt_get_current() != query->skt->thr->tpt) { syslog(LOG_DEBUG, "tpt MISSMATCH!!!!!!!!!"); return; } @@ -1002,7 +1002,7 @@ radius_client_recv_cb(tp_task_p tptask __unused, int error, rad_pkt_hdr_p pkt; SYSLOGD_EX(LOG_DEBUG, "..."); - if (tp_thread_get_current() != skt->thr->tpt) { + if (tpt_get_current() != skt->thr->tpt) { syslog(LOG_DEBUG, "tpt MISSMATCH!!!!!!!!!"); goto rcv_next; } diff --git a/src/threadpool/threadpool.c b/src/threadpool/threadpool.c index 6d78050..260cef9 100644 --- a/src/threadpool/threadpool.c +++ b/src/threadpool/threadpool.c @@ -91,7 +91,9 @@ static tp_p g_tp = NULL; #ifdef BSD /* BSD specific code. */ -#define TPT_ITEM_EV_COUNT 64 +#ifndef TPT_ITEM_EV_COUNT +# define TPT_ITEM_EV_COUNT 4 +#endif static const u_short tp_op_to_flags_kq_map[] = { (EV_ADD | EV_ENABLE), /* 0: TP_CTL_ADD */ @@ -168,6 +170,7 @@ typedef struct thread_pool_thread_s { /* thread pool thread info */ tp_udata_t pvt_udata; /* Pool virtual thread support. */ #endif /* Linux specific code. */ tp_p tp; /* */ + void *tls[TP_TPT_TLS_COUNT]; /* Thread local storage. */ } tp_thread_t; #define TP_THREAD_STATE_STOP 0 @@ -180,11 +183,9 @@ typedef struct thread_pool_s { /* thread pool */ tpt_p pvt; /* Pool virtual thread. */ volatile size_t rr_idx; volatile size_t shutdown; - uint32_t flags; - char name[TP_NAME_SIZE]; size_t cpu_count; uintptr_t fd_count; - size_t threads_max; + tp_settings_t s; volatile size_t threads_cnt; /* Worker threads count. */ tp_thread_t threads[]; /* Worker threads. */ } tp_t; @@ -270,12 +271,12 @@ tpt_data_event_init(tpt_p tpt) { struct kevent kev; tpt->io_fd = (uintptr_t)kqueuex( - (0 != (TP_S_F_CLOEXEC & tpt->tp->flags) ? KQUEUE_CLOEXEC : 0)); + (0 != (TP_S_F_CLOEXEC & tpt->tp->s.flags) ? KQUEUE_CLOEXEC : 0)); if ((uintptr_t)-1 == tpt->io_fd) return (errno); /* Init threads message exchange. */ tpt->msg_queue = tpt_msg_queue_create(tpt, - (0 != (TP_S_F_CLOEXEC & tpt->tp->flags) ? TP_MSG_Q_F_CLOEXEC : 0)); + (0 != (TP_S_F_CLOEXEC & tpt->tp->s.flags) ? TP_MSG_Q_F_CLOEXEC : 0)); if (NULL == tpt->msg_queue) return (errno); if (NULL != tpt->tp->pvt && @@ -550,12 +551,12 @@ tpt_data_event_init(tpt_p tpt) { tp_event_t ev; tpt->io_fd = epoll_create1( - (0 != (TP_S_F_CLOEXEC & tpt->tp->flags) ? EPOLL_CLOEXEC : 0)); + (0 != (TP_S_F_CLOEXEC & tpt->tp->s.flags) ? EPOLL_CLOEXEC : 0)); if ((uintptr_t)-1 == tpt->io_fd) return (errno); /* Init threads message exchange. */ tpt->msg_queue = tpt_msg_queue_create(tpt, - (0 != (TP_S_F_CLOEXEC & tpt->tp->flags) ? TP_MSG_Q_F_CLOEXEC : 0)); + (0 != (TP_S_F_CLOEXEC & tpt->tp->s.flags) ? TP_MSG_Q_F_CLOEXEC : 0)); if (NULL == tpt->msg_queue) return (errno); if (NULL != tpt->tp->pvt && @@ -994,17 +995,21 @@ tp_create(tp_settings_p s, tp_p *ptp) { if (NULL == tp) return (ENOMEM); fd_max_count = (uintptr_t)getdtablesize(); - tp->flags = s->flags; - memcpy(tp->name, s->name, TP_NAME_SIZE); + memcpy(&tp->s, s, sizeof(tp_settings_t)); tp->cpu_count = cpu_count; - tp->threads_max = s->threads_max; tp->fd_count = fd_max_count; + /* Private virtual thread. */ tp->pvt = &tp->threads[s->threads_max]; - error = tpt_data_init(tp, -1, (size_t)~0, &tp->threads[s->threads_max]); + error = tpt_data_init(tp, -1, s->threads_max, &tp->threads[s->threads_max]); if (0 != error) { SYSLOG_ERR(LOG_CRIT, error, "tpt_data_init() - pvt."); goto err_out; } + tp->pvt->state = TP_THREAD_STATE_RUNNING; + if (NULL != s->tpt_on_start) { + s->tpt_on_start(tp->pvt); + } + for (i = 0, cur_cpu = 0; i < s->threads_max; i ++, cur_cpu ++) { if (0 != (TP_S_F_BIND2CPU & s->flags)) { if ((size_t)cur_cpu >= cpu_count) { @@ -1041,8 +1046,13 @@ tp_shutdown(tp_p tp) { if (0 != tp->shutdown) return; tp->shutdown ++; + /* Private virtual thread. */ + tp->pvt->state = TP_THREAD_STATE_STOP; + if (NULL != tp->s.tpt_on_start) { + tp->s.tpt_on_start(tp->pvt); + } /* Shutdown threads. */ - for (size_t i = 0; i < tp->threads_max; i ++) { + for (size_t i = 0; i < tp->s.threads_max; i ++) { if (0 == tpt_is_running(&tp->threads[i])) continue; tpt_msg_send(&tp->threads[i], NULL, 0, @@ -1064,7 +1074,7 @@ tp_shutdown_wait(tp_p tp) { if (0 != tp_thread_is_tp_thr(tp, NULL)) return (EDEADLK); - for (size_t i = 0; i < tp->threads_max; i ++) { + for (size_t i = 0; i < tp->s.threads_max; i ++) { if (TP_THREAD_STATE_STOP == tp->threads[i].state) continue; error = pthread_join(tp->threads[i].pt_id, NULL); @@ -1105,7 +1115,7 @@ tp_destroy(tp_p tp) { return (error); /* Free resources. */ tpt_data_uninit(tp->pvt); - for (size_t i = 0; i < tp->threads_max; i ++) { + for (size_t i = 0; i < tp->s.threads_max; i ++) { tpt_data_uninit(&tp->threads[i]); } free(tp); @@ -1115,7 +1125,27 @@ tp_destroy(tp_p tp) { int -tp_threads_create(tp_p tp, int skip_first) { +tp_udata_set(tp_p tp, void *udata) { + + if (NULL == tp) + return (EINVAL); + tp->s.udata = udata; + + return (0); +} + +void * +tp_udata_get(tp_p tp) { + + if (NULL == tp) + return (NULL); + return (tp->s.udata); +} + + + +int +tp_threads_create(tp_p tp, const int skip_first) { tpt_p tpt; if (NULL == tp) @@ -1123,7 +1153,7 @@ tp_threads_create(tp_p tp, int skip_first) { if (0 != tp->shutdown) return (EBUSY); - for (size_t i = ((0 != skip_first) ? 1 : 0); i < tp->threads_max; i ++) { + for (size_t i = ((0 != skip_first) ? 1 : 0); i < tp->s.threads_max; i ++) { tpt = &tp->threads[i]; if (NULL == tpt->tp) continue; @@ -1182,7 +1212,7 @@ tp_thread_proc(void *data) { tpt->state = TP_THREAD_STATE_RUNNING; snprintf(thr_name, sizeof(thr_name), "%s: %zu", - tpt->tp->name, tpt->thread_num); + tpt->tp->s.name, tpt->thread_num); pthread_self_name_set(thr_name); pthread_setspecific(tp_tls_key_tpt, (const void*)tpt); syslog(LOG_INFO, "%s thread started...", thr_name); @@ -1192,7 +1222,7 @@ tp_thread_proc(void *data) { if (0 != pthread_sigmask(SIG_BLOCK, &sig_set, NULL)) { SYSLOG_ERR(LOG_WARNING, errno, "%s: Can't block the SIGPIPE signal for thread %zu.", - tpt->tp->name, tpt->thread_num); + tpt->tp->s.name, tpt->thread_num); } #ifndef DARWIN @@ -1204,17 +1234,25 @@ tp_thread_proc(void *data) { if (0 == pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cs)) { syslog(LOG_INFO, "%s: bind thread %zu to CPU %i.", - tpt->tp->name, tpt->thread_num, tpt->cpu_id); + tpt->tp->s.name, tpt->thread_num, tpt->cpu_id); } else { SYSLOG_ERR(LOG_WARNING, errno, "%s: can't Bind thread %zu to CPU %i.", - tpt->tp->name, tpt->thread_num, tpt->cpu_id); + tpt->tp->s.name, tpt->thread_num, tpt->cpu_id); } } #endif + if (NULL != tpt->tp->s.tpt_on_start) { + tpt->tp->s.tpt_on_start(tpt); + } + tpt_loop(tpt); + if (NULL != tpt->tp->s.tpt_on_stop) { + tpt->tp->s.tpt_on_stop(tpt); + } + syslog(LOG_INFO, "%s thread exited...", thr_name); pthread_setspecific(tp_tls_key_tpt, NULL); pthread_self_name_set(NULL); @@ -1232,7 +1270,7 @@ tp_thread_count_max_get(tp_p tp) { if (NULL == tp) return (0); - return (tp->threads_max); + return (tp->s.threads_max); } size_t @@ -1241,7 +1279,7 @@ tp_thread_count_get(tp_p tp) { if (NULL == tp) return (0); - for (i = 0, cnt = 0; i < tp->threads_max; i ++) { + for (i = 0, cnt = 0; i < tp->s.threads_max; i ++) { if (0 == tpt_is_running(&tp->threads[i])) continue; cnt ++; @@ -1250,31 +1288,24 @@ tp_thread_count_get(tp_p tp) { } -tpt_p -tp_thread_get_current(void) { - /* TLS magic. */ - return ((tpt_p)pthread_getspecific(tp_tls_key_tpt)); -} - int tp_thread_is_tp_thr(tp_p tp, tpt_p tpt) { if (NULL == tp) return (0); if (NULL == tpt) { - tpt = tp_thread_get_current(); + tpt = tpt_get_current(); } return ((NULL != tpt && tpt->tp == tp)); } tpt_p -tp_thread_get(tp_p tp, size_t thread_num) { +tp_thread_get(tp_p tp, const size_t thread_num) { if (NULL == tp) return (NULL); - if (tp->threads_max <= thread_num) { - thread_num = (tp->threads_max - 1); - } + if (tp->s.threads_max <= thread_num) + return (NULL); return (&tp->threads[thread_num]); } @@ -1284,7 +1315,7 @@ tp_thread_get_rr(tp_p tp) { if (NULL == tp) return (NULL); tp->rr_idx ++; - if (tp->threads_max <= tp->rr_idx) { + if (tp->s.threads_max <= tp->rr_idx) { tp->rr_idx = 0; } return (&tp->threads[tp->rr_idx]); @@ -1296,11 +1327,19 @@ tp_thread_get_pvt(tp_p tp) { if (NULL == tp) return (NULL); - return (tp->pvt /* tp->threads[0] */); + return (tp->pvt); +} + + + +tpt_p +tpt_get_current(void) { + /* TLS magic. */ + return ((tpt_p)pthread_getspecific(tp_tls_key_tpt)); } int -tp_thread_get_cpu_id(tpt_p tpt) { +tpt_get_cpu_id(tpt_p tpt) { if (NULL == tpt) return (-1); @@ -1308,15 +1347,13 @@ tp_thread_get_cpu_id(tpt_p tpt) { } size_t -tp_thread_get_num(tpt_p tpt) { +tpt_get_num(tpt_p tpt) { if (NULL == tpt) return ((size_t)-1); return (tpt->thread_num); } - - tp_p tpt_get_tp(tpt_p tpt) { @@ -1342,6 +1379,37 @@ tpt_get_msg_queue(tpt_p tpt) { return (tpt->msg_queue); } +int +tpt_tls_set(tpt_p tpt, const size_t index, void *val) { + + if (NULL == tpt || + TP_TPT_TLS_COUNT <= index) + return (EINVAL); + + tpt->tls[index] = val; + + return (0); +} + +void * +tpt_tls_get(tpt_p tpt, const size_t index) { + + if (NULL == tpt || + TP_TPT_TLS_COUNT <= index) + return (NULL); + + return (tpt->tls[index]); +} +size_t +tpt_tls_get_sz(tpt_p tpt, const size_t index) { + + if (NULL == tpt || + TP_TPT_TLS_COUNT <= index) + return (0); + + return ((size_t)tpt->tls[index]); +} + int tpt_data_init(tp_p tp, int cpu_id, size_t thread_num, tpt_p tpt) { diff --git a/src/threadpool/threadpool_msg_sys.c b/src/threadpool/threadpool_msg_sys.c index 48c2919..b7cb723 100644 --- a/src/threadpool/threadpool_msg_sys.c +++ b/src/threadpool/threadpool_msg_sys.c @@ -286,7 +286,7 @@ tpt_msg_send(tpt_p dst, tpt_p src, uint32_t flags, return (EINVAL); if (0 != (TP_MSG_F_SELF_DIRECT & flags)) { if (NULL == src) { - src = tp_thread_get_current(); + src = tpt_get_current(); } if (src == dst) { /* Self. */ msg_cb(dst, udata); @@ -307,7 +307,6 @@ tpt_msg_send(tpt_p dst, tpt_p src, uint32_t flags, if (sizeof(msg) == write(msg_queue->fd[1], &msg, sizeof(msg))) return (0); /* Error. */ - debugd_break(); if (0 != (TP_MSG_F_FAIL_DIRECT & flags)) { msg_cb(dst, udata); return (0); @@ -378,7 +377,7 @@ tpt_msg_bsend_ex(tp_p tp, tpt_p src, uint32_t flags, goto err_out; } if (NULL == src) { - src = tp_thread_get_current(); + src = tpt_get_current(); } /* 1 thread specific. */ if (1 == threads_max && @@ -482,7 +481,7 @@ tpt_msg_cbsend(tp_p tp, tpt_p src, uint32_t flags, 0 != ((TP_BMSG_F_SYNC | TP_BMSG_F_SYNC_USLEEP) & flags)) return (EINVAL); if (NULL == src) { - src = tp_thread_get_current(); + src = tpt_get_current(); } if (NULL == src) /* Cant do final callback. */ return (EINVAL); @@ -549,7 +548,7 @@ tpt_msg_async_op_alloc(tpt_p dst, tpt_msg_async_op_cb op_cb) { if (NULL == aop) return (NULL); if (NULL == dst) { - dst = tp_thread_get_current(); + dst = tpt_get_current(); } aop->tpt = dst; aop->op_cb = op_cb; diff --git a/src/utils/ring_buffer.c b/src/utils/ring_buffer.c index 1b9e1d7..cb8db1c 100644 --- a/src/utils/ring_buffer.c +++ b/src/utils/ring_buffer.c @@ -688,7 +688,7 @@ r_buf_rpos_inc(r_buf_p r_buf, r_buf_rpos_p rpos, size_t data_size) { data_size -= r_buf->iov[rpos->iov_index].iov_len; if (0 == r_buf_rpos_index_inc(r_buf, rpos)) { debug_break(); - return; /* XXX this situation is BUG and must newer happen. */ + return; /* XXX this situation is BUG and must never happen. */ } } } diff --git a/tests/threadpool/main.c b/tests/threadpool/main.c index f18a27b..268dcd4 100644 --- a/tests/threadpool/main.c +++ b/tests/threadpool/main.c @@ -69,6 +69,8 @@ static tp_p tp = NULL; static size_t threads_count; static int pipe_fd[2] = {-1, -1}; static uint8_t thr_arr[(THREADS_COUNT_MAX + 4)]; +static uint8_t thr_tls_arr[(THREADS_COUNT_MAX + 4)]; +static size_t thr_flood_arr[(THREADS_COUNT_MAX + 4)]; static int init_suite(void); static int clean_suite(void); @@ -76,16 +78,20 @@ static int clean_suite(void); static void test_tp_init1(void); static void test_tp_init16(void); static void test_tp_destroy(void); +static void test_tp_tpt_hooks(void); static void test_tp_threads_create(void); +static void test_tp_udata_get(void); +static void test_tp_udata_set(void); static void test_tp_thread_count_max_get(void); static void test_tp_thread_count_get(void); static void test_tp_thread_is_tp_thr(void); static void test_tp_thread_get(void); -static void test_tp_thread_get_current(void); static void test_tp_thread_get_rr(void); static void test_tp_thread_get_pvt(void); -static void test_tp_thread_get_cpu_id(void); +static void test_tpt_get_current(void); +static void test_tpt_get_cpu_id(void); static void test_tpt_get_tp(void); +static void test_tpt_get_msg_queue(void); static void test_tpt_msg_send(void); static void test_tpt_msg_bsend_ex1(void); static void test_tpt_msg_bsend_ex2(void); @@ -114,6 +120,7 @@ static void test_tpt_ev_add_ex_tmr_dispatch(void); #ifdef TP_F_EDGE static void test_tpt_ev_add_ex_tmr_edge(void); #endif +static void test_tp_pvt_msg_flood(void); @@ -146,15 +153,18 @@ main(int argc __unused, char *argv[] __unused) { /* Add the tests to the suite. */ if (NULL == CU_add_test(psuite, "test of test_tp_init1() - threads count = 1", test_tp_init1) || NULL == CU_add_test(psuite, "test of tp_threads_create()", test_tp_threads_create) || + NULL == CU_add_test(psuite, "test of tp_udata_get()", test_tp_udata_get) || + NULL == CU_add_test(psuite, "test of tp_udata_set()", test_tp_udata_set) || NULL == CU_add_test(psuite, "test of tp_thread_count_max_get()", test_tp_thread_count_max_get) || NULL == CU_add_test(psuite, "test of tp_thread_count_get()", test_tp_thread_count_get) || NULL == CU_add_test(psuite, "test of tp_thread_is_tp_thr()", test_tp_thread_is_tp_thr) || NULL == CU_add_test(psuite, "test of tp_thread_get()", test_tp_thread_get) || - NULL == CU_add_test(psuite, "test of tp_thread_get_current()", test_tp_thread_get_current) || NULL == CU_add_test(psuite, "test of tp_thread_get_rr()", test_tp_thread_get_rr) || NULL == CU_add_test(psuite, "test of tp_thread_get_pvt()", test_tp_thread_get_pvt) || - NULL == CU_add_test(psuite, "test of tp_thread_get_cpu_id()", test_tp_thread_get_cpu_id) || + NULL == CU_add_test(psuite, "test of tpt_get_current()", test_tpt_get_current) || + NULL == CU_add_test(psuite, "test of tpt_get_cpu_id()", test_tpt_get_cpu_id) || NULL == CU_add_test(psuite, "test of tpt_get_tp()", test_tpt_get_tp) || + NULL == CU_add_test(psuite, "test of tpt_get_msg_queue()", test_tpt_get_msg_queue) || NULL == CU_add_test(psuite, "test of tpt_msg_send()", test_tpt_msg_send) || NULL == CU_add_test(psuite, "test of tpt_msg_bsend_ex(0)", test_tpt_msg_bsend_ex1) || NULL == CU_add_test(psuite, "test of tpt_msg_bsend_ex(TP_BMSG_F_SYNC)", test_tpt_msg_bsend_ex2) || @@ -179,19 +189,24 @@ main(int argc __unused, char *argv[] __unused) { #ifdef TP_F_EDGE NULL == CU_add_test(psuite, "test of tpt_ev_add_args(TP_EV_TIMER, TP_F_EDGE)", test_tpt_ev_add_ex_tmr_edge) || #endif + NULL == CU_add_test(psuite, "test of test_tp_pvt_msg_flood()", test_tp_pvt_msg_flood) || NULL == CU_add_test(psuite, "test of test_tp_destroy()", test_tp_destroy) || + NULL == CU_add_test(psuite, "test of test_tp_tpt_hooks()", test_tp_tpt_hooks) || 0 || NULL == CU_add_test(psuite, "test of test_tp_init16() - threads count = 16", test_tp_init16) || NULL == CU_add_test(psuite, "test of tp_threads_create()", test_tp_threads_create) || + NULL == CU_add_test(psuite, "test of tp_udata_get()", test_tp_udata_get) || + NULL == CU_add_test(psuite, "test of tp_udata_set()", test_tp_udata_set) || NULL == CU_add_test(psuite, "test of tp_thread_count_max_get()", test_tp_thread_count_max_get) || NULL == CU_add_test(psuite, "test of tp_thread_count_get()", test_tp_thread_count_get) || NULL == CU_add_test(psuite, "test of tp_thread_is_tp_thr()", test_tp_thread_is_tp_thr) || NULL == CU_add_test(psuite, "test of tp_thread_get()", test_tp_thread_get) || - NULL == CU_add_test(psuite, "test of tp_thread_get_current()", test_tp_thread_get_current) || NULL == CU_add_test(psuite, "test of tp_thread_get_rr()", test_tp_thread_get_rr) || NULL == CU_add_test(psuite, "test of tp_thread_get_pvt()", test_tp_thread_get_pvt) || - NULL == CU_add_test(psuite, "test of tp_thread_get_cpu_id()", test_tp_thread_get_cpu_id) || + NULL == CU_add_test(psuite, "test of tpt_get_current()", test_tpt_get_current) || + NULL == CU_add_test(psuite, "test of tpt_get_cpu_id()", test_tpt_get_cpu_id) || NULL == CU_add_test(psuite, "test of tpt_get_tp()", test_tpt_get_tp) || + NULL == CU_add_test(psuite, "test of tpt_get_msg_queue()", test_tpt_get_msg_queue) || NULL == CU_add_test(psuite, "test of tpt_msg_send()", test_tpt_msg_send) || NULL == CU_add_test(psuite, "test of tpt_msg_bsend_ex(0)", test_tpt_msg_bsend_ex1) || NULL == CU_add_test(psuite, "test of tpt_msg_bsend_ex(TP_BMSG_F_SYNC)", test_tpt_msg_bsend_ex2) || @@ -216,7 +231,9 @@ main(int argc __unused, char *argv[] __unused) { #ifdef TP_F_EDGE NULL == CU_add_test(psuite, "test of tpt_ev_add_args(TP_EV_TIMER, TP_F_EDGE)", test_tpt_ev_add_ex_tmr_edge) || #endif + NULL == CU_add_test(psuite, "test of test_tp_pvt_msg_flood()", test_tp_pvt_msg_flood) || NULL == CU_add_test(psuite, "test of test_tp_destroy()", test_tp_destroy) || + NULL == CU_add_test(psuite, "test of test_tp_tpt_hooks()", test_tp_tpt_hooks) || 0) { goto err_out; } @@ -227,7 +244,7 @@ main(int argc __unused, char *argv[] __unused) { printf("\n"); CU_basic_show_failures(CU_get_failure_list()); printf("\n\n"); - error = CU_get_number_of_tests_failed(); + error = (int)CU_get_number_of_tests_failed(); /* Run all tests using the automated interface. */ //CU_automated_run_tests(); @@ -279,16 +296,38 @@ clean_suite(void) { } +static void +tpt_hook_start_cb(tpt_p tpt) { + size_t tpt_num = tpt_get_num(tpt); + + thr_tls_arr[tpt_num] ++; + CU_ASSERT(0 == tpt_tls_set(tpt, 0, (void*)tpt_num)) + CU_ASSERT(0 == tpt_tls_set(tpt, 1, tpt)) +} static void -test_tp_init1(void) { +tpt_hook_stop_cb(tpt_p tpt) { + size_t tpt_num = tpt_get_num(tpt); + + thr_tls_arr[tpt_num] ++; + CU_ASSERT(tpt_num == tpt_tls_get_sz(tpt, 0)) + CU_ASSERT(tpt == tpt_tls_get(tpt, 1)) +} + +static void +test_tp_init(const size_t thr_cnt) { int error; tp_settings_t s; + memset(&thr_tls_arr, 0x00, sizeof(thr_tls_arr)); + tp_settings_def(&s); - threads_count = 1; - s.threads_max = 1; + threads_count = thr_cnt; + s.threads_max = thr_cnt; s.flags = (TP_S_F_BIND2CPU); + s.tpt_on_start = tpt_hook_start_cb; + s.tpt_on_stop = tpt_hook_stop_cb; + s.udata = &thr_arr; error = tp_create(&s, &tp); CU_ASSERT(0 == error) if (0 != error) @@ -297,6 +336,16 @@ test_tp_init1(void) { test_sleep(1500); } +static void +test_tp_init1(void) { + test_tp_init(1); +} + +static void +test_tp_init16(void) { + test_tp_init(THREADS_COUNT_MAX); +} + static void test_tp_destroy(void) { @@ -308,18 +357,25 @@ test_tp_destroy(void) { } static void -test_tp_init16(void) { - int error; - tp_settings_t s; +test_tp_tpt_hooks(void) { - tp_settings_def(&s); - threads_count = THREADS_COUNT_MAX; - s.threads_max = THREADS_COUNT_MAX; - s.flags = (TP_S_F_BIND2CPU); - error = tp_create(&s, &tp); - CU_ASSERT(0 == error) - if (0 != error) - return; + for (size_t i = 0; i <= threads_count; i ++) { /* Include PVT. */ + CU_ASSERT(2 == thr_tls_arr[i]) + } + CU_PASS("test_tp_tpt_hooks()") +} + +static void +test_tp_udata_get(void) { + + CU_ASSERT(&thr_arr == tp_udata_get(tp)) +} + +static void +test_tp_udata_set(void) { + + CU_ASSERT(0 == tp_udata_set(tp, NULL)) + CU_ASSERT(NULL == tp_udata_get(tp)) } static void @@ -356,18 +412,12 @@ test_tp_thread_get(void) { size_t i; for (i = 0; i < threads_count; i ++) { - if (i != tp_thread_get_num(tp_thread_get(tp, i))) { - CU_FAIL("tp_thread_get_num()") + if (i != tpt_get_num(tp_thread_get(tp, i))) { + CU_FAIL("tpt_get_num()") return; /* Fail. */ } } - CU_PASS("tp_thread_get_num()") -} - -static void -test_tp_thread_get_current(void) { - - CU_ASSERT(NULL == tp_thread_get_current()) + CU_PASS("tpt_get_num()") } static void @@ -383,9 +433,15 @@ test_tp_thread_get_pvt(void) { } static void -test_tp_thread_get_cpu_id(void) { +test_tpt_get_current(void) { + + CU_ASSERT(NULL == tpt_get_current()) +} + +static void +test_tpt_get_cpu_id(void) { - CU_ASSERT(0 == tp_thread_get_cpu_id(tp_thread_get(tp, 0))) + CU_ASSERT(0 == tpt_get_cpu_id(tp_thread_get(tp, 0))) } static void @@ -394,14 +450,20 @@ test_tpt_get_tp(void) { CU_ASSERT(tp == tpt_get_tp(tp_thread_get(tp, 0))) } +static void +test_tpt_get_msg_queue(void) { + + CU_ASSERT(NULL != tpt_get_msg_queue(tp_thread_get(tp, 0))) +} + static void msg_send_cb(tpt_p tpt, void *udata) { - CU_ASSERT((size_t)udata == tp_thread_get_num(tpt)) + CU_ASSERT((size_t)udata == tpt_get_num(tpt)) - if ((size_t)udata == tp_thread_get_num(tpt)) { + if ((size_t)udata == tpt_get_num(tpt)) { thr_arr[(size_t)udata] = (((size_t)udata) & 0xff); } } @@ -435,9 +497,9 @@ msg_bsend_cb(tpt_p tpt, void *udata) { CU_ASSERT(udata == (void*)tpt_get_tp(tpt)) if (udata == (void*)tpt_get_tp(tpt)) { - thr_arr[tp_thread_get_num(tpt)] = (uint8_t)tp_thread_get_num(tpt); + thr_arr[tpt_get_num(tpt)] = (uint8_t)tpt_get_num(tpt); } else { - thr_arr[tp_thread_get_num(tpt)] = 0xff; + thr_arr[tpt_get_num(tpt)] = 0xff; } } static void @@ -531,7 +593,7 @@ msg_cbsend_cb(tpt_p tpt, void *udata) { CU_ASSERT(udata == (void*)tpt_get_tp(tpt)) if (udata == (void*)tpt_get_tp(tpt)) { - thr_arr[tp_thread_get_num(tpt)] = (uint8_t)tp_thread_get_num(tpt); + thr_arr[tpt_get_num(tpt)] = (uint8_t)tpt_get_num(tpt); } } static void @@ -812,3 +874,42 @@ test_tpt_ev_add_ex_tmr_edge(void) { test_tpt_ev_add_ex_tmr(TP_F_EDGE, TEST_EV_CNT_MAX, 1); } #endif + +static void +msg_send_pvt_msg_flood_cb(tpt_p tpt __unused, void *udata) { + size_t tpt_num = tpt_get_num(tpt_get_current()); /* Get real thread. */ + + CU_ASSERT(NULL == udata) + + thr_flood_arr[tpt_num] ++; +} +static void +test_tp_pvt_msg_flood(void) { + int error; + tpt_p tpt_pvt = tp_thread_get_pvt(tp); + size_t i, fired_cnt = 0; + const size_t fired_cnt_target = (threads_count * 16384); + + CU_ASSERT(NULL != tpt_pvt) + memset(&thr_flood_arr, 0x00, sizeof(thr_flood_arr)); + for (i = 0; i < fired_cnt_target; i ++) { + for (;;) { + error = tpt_msg_send(tpt_pvt, NULL, + 0, msg_send_pvt_msg_flood_cb, NULL); + if (EAGAIN != error) + break; + test_sleep(1); + } + if (0 != error) { + CU_FAIL("tpt_msg_send()") + return; /* Fail. */ + } + } + /* Wait for all threads process. */ + test_sleep(TEST_SLEEP_TIME_MS * 2); + for (i = 0; i < threads_count; i ++) { + fired_cnt += thr_flood_arr[i]; + } + CU_ASSERT(fired_cnt == fired_cnt_target) + CU_PASS("test_tp_pvt_msg_flood()") +} diff --git a/tests/threadpool/test-threadpool.project b/tests/threadpool/test-threadpool.project index 313c625..8b4e5bd 100644 --- a/tests/threadpool/test-threadpool.project +++ b/tests/threadpool/test-threadpool.project @@ -10,7 +10,7 @@ - +