Skip to content

Commit

Permalink
port events poller (illumos/Solaris): use atomic events mask
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Dec 23, 2024
1 parent be379a2 commit e3c017d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 66 deletions.
112 changes: 50 additions & 62 deletions src/platform/posix/posix_pollq_port.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
typedef struct nni_posix_pollq {
int port; // port id (from port_create)
nni_thr thr; // worker thread
nni_mtx mtx;
nni_cv cv;
} nni_posix_pollq;

// single global instance for now
Expand All @@ -45,15 +47,13 @@ nni_posix_pfd_init(nni_posix_pfd *pfdp, int fd, nni_posix_pfd_cb cb, void *arg)
(void) fcntl(fd, F_SETFD, FD_CLOEXEC);
(void) fcntl(fd, F_SETFL, O_NONBLOCK);

nni_mtx_init(&pfd->mtx);
nni_cv_init(&pfd->cv, &pfd->mtx);
pfd->closed = false;
pfd->closing = false;
pfd->fd = fd;
pfd->pq = pq;
pfd->cb = cb;
pfd->arg = arg;
pfd->data = NULL;
nni_atomic_init(&pfd->events);
pfd->closed = false;
pfd->fd = fd;
pfd->pq = pq;
pfd->cb = cb;
pfd->arg = arg;
pfd->data = NULL;
}

int
Expand All @@ -71,13 +71,8 @@ nni_posix_pfd_close(nni_posix_pfd *pfd)
return;
}

nni_mtx_lock(&pfd->mtx);
if (!pfd->closing) {
pfd->closing = true;
(void) shutdown(pfd->fd, SHUT_RDWR);
port_dissociate(pq->port, PORT_SOURCE_FD, pfd->fd);
}
nni_mtx_unlock(&pfd->mtx);
(void) shutdown(pfd->fd, SHUT_RDWR);
port_dissociate(pq->port, PORT_SOURCE_FD, pfd->fd);

// Send the wake event to the poller to synchronize with it.
// Note that port_send should only really fail if out of memory
Expand All @@ -102,11 +97,11 @@ nni_posix_pfd_stop(nni_posix_pfd *pfd)
}
sched_yield(); // try again later...
}
nni_mtx_lock(&pfd->mtx);
nni_mtx_lock(&pq->mtx);
while (!pfd->closed) {
nni_cv_wait(&pfd->cv);
nni_cv_wait(&pq->cv);
}
nni_mtx_unlock(&pfd->mtx);
nni_mtx_unlock(&pq->mtx);
}

void
Expand All @@ -122,27 +117,21 @@ nni_posix_pfd_fini(nni_posix_pfd *pfd)

// We're exclusive now.
(void) close(pfd->fd);
nni_cv_fini(&pfd->cv);
nni_mtx_fini(&pfd->mtx);
}

int
nni_posix_pfd_arm(nni_posix_pfd *pfd, unsigned events)
{
nni_posix_pollq *pq = pfd->pq;
int rv;
int ev = (int) events;

nni_mtx_lock(&pfd->mtx);
if (!pfd->closing) {
pfd->events |= events;
if (port_associate(pq->port, PORT_SOURCE_FD, pfd->fd,
(int) pfd->events, pfd) != 0) {
int rv = nni_plat_errno(errno);
nni_mtx_unlock(&pfd->mtx);
return (rv);
}
ev |= ni_atomic_or(&pfd->events, ev);
rv = port_associate(pq->port, PORT_SOURCE_FD, pfd->fd, ev, pfd);
if (rv != 0) {
nni_plat_errno(errno);
}
nni_mtx_unlock(&pfd->mtx);
return (0);
return (rv);
}

static void
Expand All @@ -152,7 +141,7 @@ nni_posix_poll_thr(void *arg)
nni_posix_pollq *pq = arg;
port_event_t ev[NNI_MAX_PORTEV];
nni_posix_pfd *pfd;
unsigned events;
int events;
nni_posix_pfd_cb cb;
void *arg;
unsigned n;
Expand All @@ -168,40 +157,35 @@ nni_posix_poll_thr(void *arg)
// We run through the returned ports twice. First we
// get the callbacks. Then we do the reaps. This way
// we ensure that we only reap *after* callbacks have run.
bool user_wake = false;
for (unsigned i = 0; i < n; i++) {
if (ev[i].portev_source != PORT_SOURCE_FD) {
switch (ev[i].portev_source) {
case PORT_SOURCE_USER:
user_wake = true;
continue;
}
pfd = ev[i].portev_user;
events = ev[i].portev_events;

nni_mtx_lock(&pfd->mtx);
cb = pfd->cb;
arg = pfd->data;
pfd->events &= ~events;
nni_mtx_unlock(&pfd->mtx);

if (cb != NULL) {
cb(pfd, events, arg);
case PORT_SOURCE_FD:
if (ev[i].portev_source != PORT_SOURCE_FD) {
continue;
}
pfd = ev[i].portev_user;
events = ev[i].portev_events;

cb = pfd->cb;
arg = pfd->data;
nni_atomic_and(&pfd->events, ~events);

cb(pfd, (unsigned) events, arg);
}
}
for (unsigned i = 0; i < n; i++) {
if (ev[i].portev_source != PORT_SOURCE_USER) {
continue;
if (user_wake) {
nni_mtx_lock(&pq->mtx);
for (unsigned i = 0; i < n; i++) {
if (ev[i].portev_source == PORT_SOURCE_USER) {
pfd->closed = true;
}
}

// User event telling us to stop doing things.
// We signal back to use this as a coordination
// event between the pollq and the thread
// handler. NOTE: It is absolutely critical
// that there is only a single thread per
// pollq. Otherwise we cannot be sure that we
// are blocked completely,
pfd = ev[i].portev_user;
nni_mtx_lock(&pfd->mtx);
pfd->closed = true;
nni_cv_wake(&pfd->cv);
nni_mtx_unlock(&pfd->mtx);
nni_cv_wake(&pq->cv);
nni_mtx_unlock(&pq->mtx);
}
}
}
Expand All @@ -210,6 +194,8 @@ static void
nni_posix_pollq_destroy(nni_posix_pollq *pq)
{
(void) close(pq->port);
nni_cv_destroy(&pq->cv);
nni_mtx_fini(&pq->mtx);
nni_thr_fini(&pq->thr);
}

Expand All @@ -228,6 +214,8 @@ nni_posix_pollq_create(nni_posix_pollq *pq)
}
nni_thr_set_name(&pq->thr, "nng:poll:port");

nni_mtx_init(&pq->mtx);
nni_cv_init(&pq->cv, pq->mtx);
nni_thr_run(&pq->thr);
return (0);
}
Expand Down
5 changes: 1 addition & 4 deletions src/platform/posix/posix_pollq_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ typedef struct nni_posix_pollq nni_posix_pollq;
struct nni_posix_pfd {
nni_posix_pollq *pq;
int fd;
nni_mtx mtx;
nni_cv cv;
unsigned events;
nni_atomic_int events;
bool closed;
bool closing;
nni_posix_pfd_cb cb;
void *data;
};
Expand Down

0 comments on commit e3c017d

Please sign in to comment.