diff --git a/src/sp/transport/inproc/inproc.c b/src/sp/transport/inproc/inproc.c index 9b6b3b4d7..1dec45e39 100644 --- a/src/sp/transport/inproc/inproc.c +++ b/src/sp/transport/inproc/inproc.c @@ -35,6 +35,7 @@ struct inproc_pipe { inproc_queue *send_queue; uint16_t peer; uint16_t proto; + nni_pipe *pipe; }; struct inproc_queue { @@ -53,7 +54,8 @@ struct inproc_pair { struct inproc_ep { const char *addr; - bool listener; + nni_listener *listener; + nni_dialer *dialer; nni_list_node node; uint16_t proto; nni_cv cv; @@ -86,32 +88,16 @@ static void inproc_pair_destroy(void *arg) { inproc_pair *pair = arg; - for (int i = 0; i < 2; i++) { - nni_mtx_fini(&pair->queues[i].lock); - } + nni_mtx_fini(&pair->queues[0].lock); + nni_mtx_fini(&pair->queues[1].lock); NNI_FREE_STRUCT(pair); } -static int -inproc_pipe_alloc(inproc_pipe **pipep, inproc_ep *ep) -{ - inproc_pipe *pipe; - - if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { - return (NNG_ENOMEM); - } - - pipe->proto = ep->proto; - pipe->addr = ep->addr; - *pipep = pipe; - return (0); -} - static int inproc_pipe_init(void *arg, nni_pipe *p) { - NNI_ARG_UNUSED(arg); - NNI_ARG_UNUSED(p); + inproc_pipe *pipe = arg; + pipe->pipe = p; return (0); } @@ -131,8 +117,6 @@ inproc_pipe_fini(void *arg) // If we are the last peer, then toss the pair structure. nni_refcnt_rele(&pair->ref); } - - NNI_FREE_STRUCT(pipe); } static void @@ -287,49 +271,34 @@ inproc_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t) return (nni_copyout_sockaddr(&sa, buf, szp, t)); } -static int -inproc_dialer_init(void **epp, nng_url *url, nni_dialer *ndialer) +static void +inproc_ep_init(inproc_ep *ep, nni_sock *sock, const nng_url *url) { - inproc_ep *ep; - nni_sock *sock = nni_dialer_sock(ndialer); - - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } nni_mtx_init(&ep->mtx); - - ep->listener = false; - ep->proto = nni_sock_proto_id(sock); - ep->rcvmax = 0; + ep->proto = nni_sock_proto_id(sock); + ep->rcvmax = 0; NNI_LIST_INIT(&ep->clients, inproc_ep, node); nni_aio_list_init(&ep->aios); - ep->addr = url->u_path; // we match on the URL path. +} + +static int +inproc_dialer_init(void **epp, nng_url *url, nni_dialer *ndialer) +{ + inproc_ep *ep = (void *) epp; - *epp = ep; + ep->dialer = ndialer; + inproc_ep_init(ep, nni_dialer_sock(ndialer), url); return (0); } static int inproc_listener_init(void **epp, nng_url *url, nni_listener *nlistener) { - inproc_ep *ep; - nni_sock *sock = nni_listener_sock(nlistener); - - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&ep->mtx); - - ep->listener = true; - ep->proto = nni_sock_proto_id(sock); - ep->rcvmax = 0; - NNI_LIST_INIT(&ep->clients, inproc_ep, node); - nni_aio_list_init(&ep->aios); - - ep->addr = url->u_path; // we match on the path + inproc_ep *ep = (void *) epp; - *epp = ep; + ep->listener = nlistener; + inproc_ep_init(ep, nni_listener_sock(nlistener), url); return (0); } @@ -344,7 +313,6 @@ inproc_ep_fini(void *arg) { inproc_ep *ep = arg; nni_mtx_fini(&ep->mtx); - NNI_FREE_STRUCT(ep); } static void @@ -352,12 +320,12 @@ inproc_conn_finish(nni_aio *aio, int rv, inproc_ep *ep, inproc_pipe *pipe) { nni_aio_list_remove(aio); - if ((!ep->listener) && nni_list_empty(&ep->aios)) { + if ((ep->listener == NULL) && nni_list_empty(&ep->aios)) { nni_list_node_remove(&ep->node); } if (rv == 0) { - nni_aio_set_output(aio, 0, pipe); + nni_aio_set_output(aio, 0, pipe->pipe); nni_aio_finish(aio, 0, 0); } else { NNI_ASSERT(pipe == NULL); @@ -427,21 +395,32 @@ inproc_accept_clients(inproc_ep *srv) &pair->ref, 2, pair, inproc_pair_destroy); spipe = cpipe = NULL; - if (((rv = inproc_pipe_alloc(&cpipe, cli)) != 0) || - ((rv = inproc_pipe_alloc(&spipe, srv)) != 0)) { + if (((rv = nni_pipe_alloc_dialer( + (void **) &cpipe, cli->dialer)) != 0) || + ((rv = nni_pipe_alloc_listener( + (void **) &spipe, srv->listener)) != 0)) { if (cpipe != NULL) { - inproc_pipe_fini(cpipe); + nni_pipe_close(cpipe->pipe); + nni_pipe_rele(cpipe->pipe); + } else { + nni_refcnt_rele(&pair->ref); } if (spipe != NULL) { - inproc_pipe_fini(spipe); + nni_pipe_close(spipe->pipe); + nni_pipe_rele(spipe->pipe); + } else { + nni_refcnt_rele(&pair->ref); } inproc_conn_finish(caio, rv, cli, NULL); inproc_conn_finish(saio, rv, srv, NULL); - inproc_pair_destroy(pair); continue; } + cpipe->proto = cli->proto; + cpipe->addr = cli->addr; + spipe->proto = srv->proto; + spipe->addr = srv->addr; cpipe->peer = spipe->proto; spipe->peer = cpipe->proto; cpipe->pair = pair; @@ -626,6 +605,7 @@ inproc_pipe_getopt( } static nni_sp_pipe_ops inproc_pipe_ops = { + .p_size = sizeof(inproc_pipe), .p_init = inproc_pipe_init, .p_fini = inproc_pipe_fini, .p_send = inproc_pipe_send, @@ -670,6 +650,7 @@ inproc_ep_setopt( } static nni_sp_dialer_ops inproc_dialer_ops = { + .d_size = sizeof(inproc_ep), .d_init = inproc_dialer_init, .d_fini = inproc_ep_fini, .d_connect = inproc_ep_connect, @@ -680,6 +661,7 @@ static nni_sp_dialer_ops inproc_dialer_ops = { }; static nni_sp_listener_ops inproc_listener_ops = { + .l_size = sizeof(inproc_ep), .l_init = inproc_listener_init, .l_fini = inproc_ep_fini, .l_bind = inproc_ep_bind,