Skip to content

Commit

Permalink
threadpool: add TP_EV_PROC to track process exit; improve timer handl…
Browse files Browse the repository at this point in the history
…er code.
  • Loading branch information
rozhuk-im committed Oct 6, 2024
1 parent 3a56615 commit f4e0e6f
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 52 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ chk_function_exists(kqueuex)
chk_function_exists(rtprio)
chk_function_exists(pthread_setname_np)
chk_function_exists(pthread_set_name_np)
chk_function_exists(posix_spawn_file_actions_addclosefrom_np)

# Check macros.
chk_symbol_exists(sys/socket.h SOCK_CLOEXEC)
Expand Down
10 changes: 9 additions & 1 deletion include/threadpool/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ typedef struct thread_pool_event_s { /* Thread pool event. */
#define TP_EV_READ 0 /* EVFILT_READ EPOLLIN | EPOLLRDHUP | EPOLLERR */
#define TP_EV_WRITE 1 /* EVFILT_WRITE EPOLLOUT | EPOLLERR */
#define TP_EV_TIMER 2 /* EVFILT_TIMER TP_EV_READ + timerfd_create */
#define TP_EV_LAST TP_EV_TIMER
#define TP_EV_PROC 3 /* EVFILT_PROC TP_EV_READ + pidfd_open */
#define TP_EV_LAST TP_EV_PROC
#define TP_EV_MASK 0x0003u /* For internal use: event set mask. */

/* Event flags. */
Expand All @@ -73,10 +74,12 @@ typedef struct thread_pool_event_s { /* Thread pool event. */
#define TP_F_EOF (((uint16_t)1) << 8) /* Ret: EV_EOF EPOLLRDHUP */
#define TP_F_ERROR (((uint16_t)1) << 9) /* Ret: EV_EOF+fflags EPOLLERR + getsockopt(SO_ERROR) */ /* fflags contain error code. */


/* Event fflags. */
/* TP_EV_READ/TP_EV_WRITE specific. */
#define TP_FF_RW_LOWAT (((uint32_t)1) << 0) /* For sockets: set SO_RCVLOWAT/SO_SNDLOWAT. */
#define TP_FF_RW_MASK 0x00000001u /* For internal use: fflags set mask. */

/* TP_EV_TIMER specific: if not set - the default is seconds. */
/* Data units selection ENUM for timer: select only one. */
#define TP_FF_T_SEC 0x00000000u /* data is seconds. */
Expand All @@ -90,6 +93,11 @@ typedef struct thread_pool_event_s { /* Thread pool event. */

static const char *tp_ff_time_units[] = { "s", "ms", "us", "ns", NULL };

/* TP_EV_PROC specific: if not set - the default is seconds. */
#define TP_FF_P_EXIT (((uint32_t)1) << 0) /* The process has exited. The exit status will be stored in data. */
#define TP_FF_P_MASK 0x00000001u /* For internal use: fflags set mask. */



typedef void (*tpt_hook_cb)(tpt_p tpt);
typedef void (*tp_cb)(tp_event_p ev, tp_udata_p tp_udata);
Expand Down
182 changes: 132 additions & 50 deletions src/threadpool/threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
# include <sys/timerfd.h>
# include <sys/ioctl.h>
# include <sys/socket.h>
# include <sys/syscall.h>
# include <sys/wait.h>
# ifndef PIDFD_NONBLOCK
# define PIDFD_NONBLOCK O_NONBLOCK
# endif
#endif /* Linux specific code. */

#include <sys/queue.h>
Expand Down Expand Up @@ -107,6 +112,7 @@ static const short tp_event_to_kq_map[] = {
EVFILT_READ, /* 0: TP_EV_READ */
EVFILT_WRITE, /* 1: TP_EV_WRITE */
EVFILT_TIMER, /* 2: TP_EV_TIMER */
EVFILT_PROC, /* 3: TP_EV_PROC */
0
};

Expand All @@ -125,6 +131,7 @@ static const uint32_t tp_event_to_ep_map[] = {
EPOLL_IN, /* 0: TP_EV_READ */
EPOLL_OUT, /* 1: TP_EV_WRITE */
0, /* 2: TP_EV_TIMER */
0, /* 3: TP_EV_PROC */
0
};

Expand Down Expand Up @@ -261,6 +268,9 @@ tp_fflags_to_kq(uint16_t event, uint32_t fflags) {
break;
}
break;
case TP_EV_PROC:
ret |= NOTE_EXIT; /* Always set. */
break;
}

return (ret);
Expand Down Expand Up @@ -325,6 +335,7 @@ tpt_ev_post(int op, tp_event_p ev, tp_udata_p tp_udata) {
}
break;
case TP_EV_TIMER: /* Timer: force update. */
case TP_EV_PROC: /* Process: force update. */
if (0 != ((EV_ADD | EV_ENABLE) & kev.flags)) {
kev.flags |= (EV_ADD | EV_ENABLE);
}
Expand Down Expand Up @@ -486,28 +497,33 @@ tpt_loop(tpt_p tpt) {
/* Translate kq event to thread poll event. */
switch (kev.filter) {
case EVFILT_READ:
ev.event = TP_EV_READ;
break;
case EVFILT_WRITE:
ev.event = TP_EV_WRITE;
ev.event = ((EVFILT_READ == kev.filter) ? TP_EV_READ : TP_EV_WRITE);
ev.flags = 0;
if (0 != (EV_EOF & kev.flags)) {
ev.flags |= TP_F_EOF;
if (0 != kev.fflags) { /* For socket: closed, and error present. */
ev.flags |= TP_F_ERROR;
}
}
ev.fflags = (uint32_t)kev.fflags;
break;
case EVFILT_TIMER:
ev.event = TP_EV_TIMER;
ev.flags = 0;
ev.fflags = 0;
break;
case EVFILT_PROC:
ev.event = TP_EV_PROC;
ev.flags = 0;
ev.fflags = TP_FF_P_EXIT;
break;
default:
syslog(LOG_DEBUG, "kevent with invalid filter = %i, ident = %zu.",
kev.filter, kev.ident);
debugd_break();
continue;
}
ev.flags = 0;
if (0 != (EV_EOF & kev.flags)) {
ev.flags |= TP_F_EOF;
if (0 != kev.fflags) { /* For socket: closed, and error present. */
ev.flags |= TP_F_ERROR;
}
}
ev.fflags = (uint32_t)kev.fflags;
ev.data = (uint64_t)kev.data;

tp_udata->cb_func(&ev, tp_udata);
Expand All @@ -521,6 +537,11 @@ tpt_loop(tpt_p tpt) {
#define TP_EV_OTHER(event) \
(TP_EV_READ == (event) ? TP_EV_WRITE : TP_EV_READ)

static int
pidfd_open(pid_t pid, unsigned int flags) {
return syscall(SYS_pidfd_open, pid, flags);
}

/* Translate thread pool flags <-> epoll flags. */
static inline uint32_t
tp_flags_to_ep(const int op, const uint16_t flags) {
Expand Down Expand Up @@ -617,7 +638,7 @@ epoll_ctl_ex(int epfd, int op, int fd, struct epoll_event *event) {

static int
tpt_ev_post(int op, tp_event_p ev, tp_udata_p tp_udata) {
int error = 0, tfd, clockid = CLOCK_MONOTONIC, tmr_flags = 0, op_guess;
int error = 0, tfd, op_guess;
uint32_t lowat;
struct itimerspec new_tmr;
struct epoll_event epev;
Expand All @@ -630,18 +651,19 @@ tpt_ev_post(int op, tp_event_p ev, tp_udata_p tp_udata) {
epev.events = (EPOLLHUP | EPOLLERR);
epev.data.ptr = (void*)tp_udata;

if (TP_EV_TIMER == ev->event) { /* Special handle for timer. */
switch (ev->event) {
case TP_EV_TIMER: /* Special handle for timer. */
tfd = TPDATA_TFD_GET(tp_udata->tpdata);
if (TP_CTL_DEL == op) { /* Delete timer. */
switch (op) {
case TP_CTL_DEL: /* Delete timer. */
if (0 == tfd)
return (ENOENT);
error = 0;
err_out_timer:
close(tfd); /* No need to epoll_ctl(EPOLL_CTL_DEL). */
tp_udata->tpdata = 0;
return (error);
}
if (TP_CTL_DISABLE == op) {
case TP_CTL_DISABLE:
if (0 == tfd)
return (ENOENT);
tp_udata->tpdata |= TPDATA_F_DISABLED;
Expand All @@ -654,13 +676,9 @@ tpt_ev_post(int op, tp_event_p ev, tp_udata_p tp_udata) {
}

/* TP_CTL_ADD, TP_CTL_ENABLE */
if (0 != (TP_FF_T_ABSTIME & ev->fflags)) {
clockid = CLOCK_REALTIME;
tmr_flags = TFD_TIMER_ABSTIME;
}

if (0 == tfd) { /* Create timer, if needed. */
tfd = timerfd_create(clockid,
tfd = timerfd_create(
((0 != (TP_FF_T_ABSTIME & ev->fflags)) ? CLOCK_REALTIME : CLOCK_MONOTONIC),
(TFD_NONBLOCK |
(0 != (TP_S_F_CLOEXEC & tp_udata->tpt->tp->s.flags) ? TFD_CLOEXEC : 0)));
if (-1 == tfd) {
Expand All @@ -669,8 +687,8 @@ tpt_ev_post(int op, tp_event_p ev, tp_udata_p tp_udata) {
}
TPDATA_TFD_SET(tp_udata->tpdata, tfd);
TPDATA_EV_FL_SET(tp_udata->tpdata, ev->event, ev->flags); /* Remember original event and flags. */
/* Adding to epoll. */
epev.events |= EPOLLIN; /* Not set EPOLLONESHOT, control timer. */
/* Add to epoll. */
epev.events |= EPOLLIN; /* Not set EPOLLONESHOT, use timer control. */
if (0 != epoll_ctl((int)tp_udata->tpt->io_fd,
EPOLL_CTL_ADD, tfd, &epev)) {
error = errno;
Expand Down Expand Up @@ -702,11 +720,52 @@ tpt_ev_post(int op, tp_event_p ev, tp_udata_p tp_udata) {
} else { /* Periodic. */
new_tmr.it_interval = new_tmr.it_value; /* memcpy(). */
}
if (-1 == timerfd_settime(tfd, tmr_flags, &new_tmr, NULL)) {
if (-1 == timerfd_settime(tfd,
((0 != (TP_FF_T_ABSTIME & ev->fflags)) ? TFD_TIMER_ABSTIME : 0),
&new_tmr, NULL)) {
error = errno;
goto err_out_timer;
}
return (0);
case TP_EV_PROC:
tfd = TPDATA_TFD_GET(tp_udata->tpdata);
switch (op) {
case TP_CTL_DEL: /* Delete proc. */
case TP_CTL_DISABLE:
if (0 == tfd)
return (ENOENT);
error = 0;
err_out_proc:
close(tfd); /* No need to epoll_ctl(EPOLL_CTL_DEL). */
tp_udata->tpdata = 0;
return (error);
case TP_CTL_ADD: /* Add proc. */
case TP_CTL_ENABLE:
if (0 != tfd)
return (EEXIST);
/* Create pidfd. */
tfd = pidfd_open(tp_udata->ident, PIDFD_NONBLOCK);
if (-1 == tfd)
return (errno);
if (0 != (TP_S_F_CLOEXEC & tp_udata->tpt->tp->s.flags)) {
if (-1 == fcntl(tfd, F_SETFD, (FD_CLOEXEC | O_NONBLOCK))) {
error = errno;
goto err_out_proc;
}
}
tp_udata->tpdata = 0;
TPDATA_TFD_SET(tp_udata->tpdata, tfd);
TPDATA_EV_FL_SET(tp_udata->tpdata, ev->event, ev->flags); /* Remember original event and flags. */
/* Add to epoll. */
epev.events |= (EPOLLIN | EPOLLONESHOT); /* Always oneshot, by design pidfd(). */
if (0 != epoll_ctl((int)tp_udata->tpt->io_fd,
EPOLL_CTL_ADD, tfd, &epev)) {
error = errno;
goto err_out_proc;
}
break;
}
return (0);
}

/* Read/Write events. */
Expand Down Expand Up @@ -818,40 +877,55 @@ tpt_loop(tpt_p tpt) {
if (0 != (TP_F_DISPATCH & tpev_flags)) { /* Mark as disabled. */
tp_udata->tpdata |= TPDATA_F_DISABLED;
}
if (TP_EV_TIMER == ev.event) { /* Timer. */

switch (ev.event) {
case TP_EV_READ:
case TP_EV_WRITE:
/* Read/write. */
if (0 != (EPOLL_HUP & epev.events)) {
ev.flags |= TP_F_EOF;
}
if (0 != (EPOLLERR & epev.events)) { /* Try to get error code. */
ev.flags |= TP_F_ERROR;
ev.fflags = errno;
optlen = sizeof(int);
if (0 == getsockopt((int)tp_udata->ident,
SOL_SOCKET, SO_ERROR, &itm, &optlen)) {
ev.fflags = itm;
}
if (0 == ev.fflags) {
ev.fflags = EINVAL;
}
}
if (0 != (TP_F_ONESHOT & tpev_flags)) { /* Onetime. */
epoll_ctl((int)tpt->io_fd, EPOLL_CTL_DEL,
(int)tp_udata->ident, &epev);
tp_udata->tpdata = 0;
}
ev.data = UINT64_MAX; /* Transfer as many as you can. */
//ioctl((int)tp_udata->ident, FIONREAD, &ev.data);
break;
case TP_EV_TIMER: /* Timer. */
tfd = TPDATA_TFD_GET(tp_udata->tpdata);
itm = read(tfd, &ev.data, sizeof(uint64_t));
if (0 != (TP_F_ONESHOT & tpev_flags)) { /* Onetime. */
close(tfd); /* No need to epoll_ctl(EPOLL_CTL_DEL). */
tp_udata->tpdata = 0;
}
tp_udata->cb_func(&ev, tp_udata);
continue;
}
/* Read/write. */
ev.data = UINT64_MAX; /* Transfer as many as you can. */
//ioctl((int)tp_udata->ident, FIONREAD, &ev.data);
if (0 != (EPOLL_HUP & epev.events)) {
ev.flags |= TP_F_EOF;
}
if (0 != (EPOLLERR & epev.events)) { /* Try to get error code. */
ev.flags |= TP_F_ERROR;
ev.fflags = errno;
optlen = sizeof(int);
if (0 == getsockopt((int)tp_udata->ident,
SOL_SOCKET, SO_ERROR, &itm, &optlen)) {
ev.fflags = itm;
}
if (0 == ev.fflags) {
ev.fflags = EINVAL;
}
}
if (0 != (TP_F_ONESHOT & tpev_flags)) { /* Onetime. */
epoll_ctl((int)tpt->io_fd, EPOLL_CTL_DEL,
(int)tp_udata->ident, &epev);
break;
case TP_EV_PROC: /* Process. */
/* Read exit code. */
itm = 0;
waitpid((pid_t)tp_udata->ident, &itm, WNOHANG);
ev.fflags = TP_FF_P_EXIT;
ev.data = (uint64_t)itm;
/* Close pidfd. */
close(TPDATA_TFD_GET(tp_udata->tpdata)); /* No need to epoll_ctl(EPOLL_CTL_DEL). */
tp_udata->tpdata = 0;
break;
}

/* Do callback. */
tp_udata->cb_func(&ev, tp_udata);
} /* End Main loop. */
return;
Expand Down Expand Up @@ -1476,6 +1550,14 @@ tpt_ev_validate(int op, tp_event_p ev, tp_udata_p tp_udata) {
if (0 != (~(TP_FF_T_MASK) & ev->fflags))
return (EINVAL); /* Invalid fflags: some unknown bits is set. */
break;
case TP_EV_PROC:
#if defined(TP_F_EDGE)
if (0 != (TP_F_EDGE & ev->flags))
return (EINVAL); /* Invalid flags. */
#endif
if (0 != (~(TP_FF_P_MASK) & ev->fflags))
return (EINVAL); /* Invalid fflags: some unknown bits is set. */
break;
default:
return (EINVAL); /* Bad event. */
}
Expand Down
Loading

0 comments on commit f4e0e6f

Please sign in to comment.