Skip to content

Commit

Permalink
fixes #530 POSIX pollqs should scale horizontally (epoll)
Browse files Browse the repository at this point in the history
This should help Linux platforms scale even further with NNG.

Port events and *maybe* poll are the last to do.  Probably select
will remain left in the cold, because honestly select based systems
are already performance constrained.
  • Loading branch information
gdamore committed Dec 31, 2024
1 parent 0fb3318 commit 90faa53
Showing 1 changed file with 66 additions and 28 deletions.
94 changes: 66 additions & 28 deletions src/platform/posix/posix_pollq_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,21 @@ typedef struct nni_posix_pollq {
int epfd; // epoll handle
int evfd; // event fd (to wake us for other stuff)
bool close; // request for worker to exit
nni_thr thr; // worker thread
bool init;
nni_thr thr; // worker thread
nni_list reapq;
} nni_posix_pollq;

// single global instance for now.
static nni_posix_pollq nni_posix_global_pollq;
static nni_posix_pollq *nni_epoll_pqs;
static int nni_epoll_npq;

void
nni_posix_pfd_init(nni_posix_pfd *pfd, int fd, nni_posix_pfd_cb cb, void *arg)
{
nni_posix_pollq *pq;

pq = &nni_posix_global_pollq;
pq = &nni_epoll_pqs[fd % nni_epoll_npq];

(void) fcntl(fd, F_SETFD, FD_CLOEXEC);
(void) fcntl(fd, F_SETFL, O_NONBLOCK);
Expand Down Expand Up @@ -191,7 +193,7 @@ nni_posix_pollq_reap(nni_posix_pollq *pq)
}

static void
nni_posix_poll_thr(void *arg)
nni_epoll_thr(void *arg)
{
nni_posix_pollq *pq = arg;
struct epoll_event events[NNI_MAX_EPOLL_EVENTS];
Expand Down Expand Up @@ -245,30 +247,32 @@ nni_posix_poll_thr(void *arg)
}

static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
nni_epoll_pq_destroy(nni_posix_pollq *pq)
{
uint64_t one = 1;

nni_mtx_lock(&pq->mtx);
pq->close = true;
if (pq->init) {
nni_mtx_lock(&pq->mtx);
pq->close = true;

if (write(pq->evfd, &one, sizeof(one)) != sizeof(one)) {
// This should never occur, and if it does it could
// lead to a hang.
nni_panic("BUG! unable to write to evfd!");
}
nni_mtx_unlock(&pq->mtx);
if (write(pq->evfd, &one, sizeof(one)) != sizeof(one)) {
// This should never occur, and if it does it could
// lead to a hang.
nni_panic("BUG! unable to write to evfd!");

Check warning on line 261 in src/platform/posix/posix_pollq_epoll.c

View check run for this annotation

Codecov / codecov/patch

src/platform/posix/posix_pollq_epoll.c#L261

Added line #L261 was not covered by tests
}
nni_mtx_unlock(&pq->mtx);

nni_thr_fini(&pq->thr);
nni_thr_fini(&pq->thr);

close(pq->evfd);
close(pq->epfd);
close(pq->evfd);
close(pq->epfd);

nni_mtx_fini(&pq->mtx);
nni_mtx_fini(&pq->mtx);
}
}

static int
nni_posix_pollq_add_eventfd(nni_posix_pollq *pq)
nni_epoll_pq_add_eventfd(nni_posix_pollq *pq)
{
// add event fd so we can wake ourself on exit
struct epoll_event ev;
Expand All @@ -295,10 +299,16 @@ nni_posix_pollq_add_eventfd(nni_posix_pollq *pq)
}

static int
nni_posix_pollq_create(nni_posix_pollq *pq)
nni_epoll_pq_create(nni_posix_pollq *pq)
{
int rv;

NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
nni_mtx_init(&pq->mtx);
nni_cv_init(&pq->cv, &pq->mtx);
pq->epfd = -1;
pq->init = true;

#if NNG_HAVE_EPOLL_CREATE1
if ((pq->epfd = epoll_create1(EPOLL_CLOEXEC)) < 0) {
return (nni_plat_errno(errno));
Expand All @@ -314,16 +324,12 @@ nni_posix_pollq_create(nni_posix_pollq *pq)

pq->close = false;

NNI_LIST_INIT(&pq->reapq, nni_posix_pfd, node);
nni_mtx_init(&pq->mtx);
nni_cv_init(&pq->cv, &pq->mtx);

if ((rv = nni_posix_pollq_add_eventfd(pq)) != 0) {
if ((rv = nni_epoll_pq_add_eventfd(pq)) != 0) {
(void) close(pq->epfd);
nni_mtx_fini(&pq->mtx);
return (rv);
}
if ((rv = nni_thr_init(&pq->thr, nni_posix_poll_thr, pq)) != 0) {
if ((rv = nni_thr_init(&pq->thr, nni_epoll_thr, pq)) != 0) {
(void) close(pq->epfd);
(void) close(pq->evfd);
nni_mtx_fini(&pq->mtx);
Expand All @@ -337,14 +343,46 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
int
nni_posix_pollq_sysinit(nng_init_params *params)
{
NNI_ARG_UNUSED(params);
return (nni_posix_pollq_create(&nni_posix_global_pollq));
int16_t num_thr;
int16_t max_thr;

max_thr = params->max_poller_threads;
num_thr = params->num_poller_threads;

if ((max_thr > 0) && (num_thr > max_thr)) {
num_thr = max_thr;
}
if (num_thr < 1) {
num_thr = 1;

Check warning on line 356 in src/platform/posix/posix_pollq_epoll.c

View check run for this annotation

Codecov / codecov/patch

src/platform/posix/posix_pollq_epoll.c#L356

Added line #L356 was not covered by tests
}
params->num_poller_threads = num_thr;
if ((nni_epoll_pqs = NNI_ALLOC_STRUCTS(nni_epoll_pqs, num_thr)) ==
NULL) {
return (NNG_ENOMEM);

Check warning on line 361 in src/platform/posix/posix_pollq_epoll.c

View check run for this annotation

Codecov / codecov/patch

src/platform/posix/posix_pollq_epoll.c#L361

Added line #L361 was not covered by tests
}

nni_epoll_npq = num_thr;
for (int i = 0; i < num_thr; i++) {
int rv;
if ((rv = nni_epoll_pq_create(&nni_epoll_pqs[i])) != 0) {
nni_posix_pollq_sysfini();
return (rv);

Check warning on line 369 in src/platform/posix/posix_pollq_epoll.c

View check run for this annotation

Codecov / codecov/patch

src/platform/posix/posix_pollq_epoll.c#L368-L369

Added lines #L368 - L369 were not covered by tests
}
}
return (0);
}

void
nni_posix_pollq_sysfini(void)
{
nni_posix_pollq_destroy(&nni_posix_global_pollq);
if (nni_epoll_npq > 0) {
for (int i = 0; i < nni_epoll_npq; i++) {
nni_epoll_pq_destroy(&nni_epoll_pqs[i]);
}
NNI_FREE_STRUCTS(nni_epoll_pqs, nni_epoll_npq);
nni_epoll_pqs = NULL;
nni_epoll_npq = 0;
}
}

#endif // NNG_HAVE_EPOLL

0 comments on commit 90faa53

Please sign in to comment.