Skip to content

Commit

Permalink
inproc: use inline data structures for SP blocks
Browse files Browse the repository at this point in the history
The pair is still a separate allocation, but this overall does
reduce the number of allocations as well as a failure paths.
  • Loading branch information
gdamore committed Dec 16, 2024
1 parent 23fa7c2 commit 0de15f0
Showing 1 changed file with 43 additions and 61 deletions.
104 changes: 43 additions & 61 deletions src/sp/transport/inproc/inproc.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct inproc_pipe {
inproc_queue *send_queue;
uint16_t peer;
uint16_t proto;
nni_pipe *pipe;
};

struct inproc_queue {
Expand All @@ -53,7 +54,8 @@ struct inproc_pair {

struct inproc_ep {
const char *addr;
bool listener;
nni_listener *listener;
nni_dialer *dialer;
nni_list_node node;
uint16_t proto;
nni_cv cv;
Expand Down Expand Up @@ -86,32 +88,16 @@ static void
inproc_pair_destroy(void *arg)
{
inproc_pair *pair = arg;
for (int i = 0; i < 2; i++) {
nni_mtx_fini(&pair->queues[i].lock);
}
nni_mtx_fini(&pair->queues[0].lock);
nni_mtx_fini(&pair->queues[1].lock);
NNI_FREE_STRUCT(pair);
}

static int
inproc_pipe_alloc(inproc_pipe **pipep, inproc_ep *ep)
{
inproc_pipe *pipe;

if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) {
return (NNG_ENOMEM);
}

pipe->proto = ep->proto;
pipe->addr = ep->addr;
*pipep = pipe;
return (0);
}

static int
inproc_pipe_init(void *arg, nni_pipe *p)
{
NNI_ARG_UNUSED(arg);
NNI_ARG_UNUSED(p);
inproc_pipe *pipe = arg;
pipe->pipe = p;
return (0);
}

Expand All @@ -131,8 +117,6 @@ inproc_pipe_fini(void *arg)
// If we are the last peer, then toss the pair structure.
nni_refcnt_rele(&pair->ref);
}

NNI_FREE_STRUCT(pipe);
}

static void
Expand Down Expand Up @@ -287,49 +271,34 @@ inproc_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t)
return (nni_copyout_sockaddr(&sa, buf, szp, t));
}

static int
inproc_dialer_init(void **epp, nng_url *url, nni_dialer *ndialer)
static void
inproc_ep_init(inproc_ep *ep, nni_sock *sock, const nng_url *url)
{
inproc_ep *ep;
nni_sock *sock = nni_dialer_sock(ndialer);

if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&ep->mtx);

ep->listener = false;
ep->proto = nni_sock_proto_id(sock);
ep->rcvmax = 0;
ep->proto = nni_sock_proto_id(sock);
ep->rcvmax = 0;
NNI_LIST_INIT(&ep->clients, inproc_ep, node);
nni_aio_list_init(&ep->aios);

ep->addr = url->u_path; // we match on the URL path.
}

static int
inproc_dialer_init(void **epp, nng_url *url, nni_dialer *ndialer)
{
inproc_ep *ep = (void *) epp;

*epp = ep;
ep->dialer = ndialer;
inproc_ep_init(ep, nni_dialer_sock(ndialer), url);
return (0);
}

static int
inproc_listener_init(void **epp, nng_url *url, nni_listener *nlistener)
{
inproc_ep *ep;
nni_sock *sock = nni_listener_sock(nlistener);

if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&ep->mtx);

ep->listener = true;
ep->proto = nni_sock_proto_id(sock);
ep->rcvmax = 0;
NNI_LIST_INIT(&ep->clients, inproc_ep, node);
nni_aio_list_init(&ep->aios);

ep->addr = url->u_path; // we match on the path
inproc_ep *ep = (void *) epp;

*epp = ep;
ep->listener = nlistener;
inproc_ep_init(ep, nni_listener_sock(nlistener), url);
return (0);
}

Expand All @@ -344,20 +313,19 @@ inproc_ep_fini(void *arg)
{
inproc_ep *ep = arg;
nni_mtx_fini(&ep->mtx);
NNI_FREE_STRUCT(ep);
}

static void
inproc_conn_finish(nni_aio *aio, int rv, inproc_ep *ep, inproc_pipe *pipe)
{
nni_aio_list_remove(aio);

if ((!ep->listener) && nni_list_empty(&ep->aios)) {
if ((ep->listener == NULL) && nni_list_empty(&ep->aios)) {
nni_list_node_remove(&ep->node);
}

if (rv == 0) {
nni_aio_set_output(aio, 0, pipe);
nni_aio_set_output(aio, 0, pipe->pipe);
nni_aio_finish(aio, 0, 0);
} else {
NNI_ASSERT(pipe == NULL);
Expand Down Expand Up @@ -427,21 +395,32 @@ inproc_accept_clients(inproc_ep *srv)
&pair->ref, 2, pair, inproc_pair_destroy);

spipe = cpipe = NULL;
if (((rv = inproc_pipe_alloc(&cpipe, cli)) != 0) ||
((rv = inproc_pipe_alloc(&spipe, srv)) != 0)) {
if (((rv = nni_pipe_alloc_dialer(
(void **) &cpipe, cli->dialer)) != 0) ||
((rv = nni_pipe_alloc_listener(
(void **) &spipe, srv->listener)) != 0)) {

if (cpipe != NULL) {
inproc_pipe_fini(cpipe);
nni_pipe_close(cpipe->pipe);
nni_pipe_rele(cpipe->pipe);
} else {
nni_refcnt_rele(&pair->ref);

Check warning on line 407 in src/sp/transport/inproc/inproc.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/inproc/inproc.c#L404-L407

Added lines #L404 - L407 were not covered by tests
}
if (spipe != NULL) {
inproc_pipe_fini(spipe);
nni_pipe_close(spipe->pipe);
nni_pipe_rele(spipe->pipe);
} else {
nni_refcnt_rele(&pair->ref);

Check warning on line 413 in src/sp/transport/inproc/inproc.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/inproc/inproc.c#L410-L413

Added lines #L410 - L413 were not covered by tests
}
inproc_conn_finish(caio, rv, cli, NULL);
inproc_conn_finish(saio, rv, srv, NULL);
inproc_pair_destroy(pair);
continue;
}

cpipe->proto = cli->proto;
cpipe->addr = cli->addr;
spipe->proto = srv->proto;
spipe->addr = srv->addr;
cpipe->peer = spipe->proto;
spipe->peer = cpipe->proto;
cpipe->pair = pair;
Expand Down Expand Up @@ -626,6 +605,7 @@ inproc_pipe_getopt(
}

static nni_sp_pipe_ops inproc_pipe_ops = {
.p_size = sizeof(inproc_pipe),
.p_init = inproc_pipe_init,
.p_fini = inproc_pipe_fini,
.p_send = inproc_pipe_send,
Expand Down Expand Up @@ -670,6 +650,7 @@ inproc_ep_setopt(
}

static nni_sp_dialer_ops inproc_dialer_ops = {
.d_size = sizeof(inproc_ep),
.d_init = inproc_dialer_init,
.d_fini = inproc_ep_fini,
.d_connect = inproc_ep_connect,
Expand All @@ -680,6 +661,7 @@ static nni_sp_dialer_ops inproc_dialer_ops = {
};

static nni_sp_listener_ops inproc_listener_ops = {
.l_size = sizeof(inproc_ep),
.l_init = inproc_listener_init,
.l_fini = inproc_ep_fini,
.l_bind = inproc_ep_bind,
Expand Down

0 comments on commit 0de15f0

Please sign in to comment.