diff --git a/include/nng/nng.h b/include/nng/nng.h index 62c6e1330..04ef88ba0 100644 --- a/include/nng/nng.h +++ b/include/nng/nng.h @@ -271,6 +271,7 @@ uint32_t nng_sockaddr_port(const nng_sockaddr *sa); // Only one callback can be set on a given socket, and there is no way // to retrieve the old value. typedef enum { + NNG_PIPE_EV_NONE, // Used internally, must be first, never posted NNG_PIPE_EV_ADD_PRE, // Called just before pipe added to socket NNG_PIPE_EV_ADD_POST, // Called just after pipe added to socket NNG_PIPE_EV_REM_POST, // Called just after pipe removed from socket diff --git a/src/core/pipe.c b/src/core/pipe.c index dac241409..d7cd6bc0e 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -256,13 +256,13 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d, return (NNG_ENOMEM); } - p->p_size = sz; - p->p_proto_ops = *pops; - p->p_tran_ops = *tops; - p->p_sock = sock; - p->p_cbs = false; - p->p_dialer = d; - p->p_listener = l; + p->p_size = sz; + p->p_proto_ops = *pops; + p->p_tran_ops = *tops; + p->p_sock = sock; + p->p_dialer = d; + p->p_listener = l; + p->p_last_event = NNG_PIPE_EV_NONE; // Two references - one for our caller, and // one to be dropped when the pipe is closed. diff --git a/src/core/socket.c b/src/core/socket.c index 6ba5d54e7..d0c18744e 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -104,6 +104,7 @@ struct nni_socket { nni_mtx s_pipe_cbs_mtx; nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM]; + bool s_want_evs; #ifdef NNG_ENABLE_STATS nni_stat_item st_root; // socket scope @@ -1013,10 +1014,16 @@ nni_sock_flags(nni_sock *sock) void nni_sock_set_pipe_cb(nni_sock *s, int ev, nng_pipe_cb cb, void *arg) { - if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) { + if ((ev > NNG_PIPE_EV_NONE) && (ev < NNG_PIPE_EV_NUM)) { nni_mtx_lock(&s->s_pipe_cbs_mtx); s->s_pipe_cbs[ev].cb_fn = cb; s->s_pipe_cbs[ev].cb_arg = arg; + s->s_want_evs = false; + for (ev = NNG_PIPE_EV_NONE; ev < NNG_PIPE_EV_NUM; ev++) { + if (s->s_pipe_cbs[ev].cb_fn != NULL) { + s->s_want_evs = true; + } + } nni_mtx_unlock(&s->s_pipe_cbs_mtx); } } @@ -1538,25 +1545,39 @@ nni_pipe_start(nni_pipe *p) void nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev) { - nni_sock *s = p->p_sock; - nng_pipe_cb cb; - void *arg; + nni_sock *s = p->p_sock; + nng_pipe_cb cb; + void *arg; + bool wantevs; + static nni_mtx serialize = NNI_MTX_INITIALIZER; nni_mtx_lock(&s->s_pipe_cbs_mtx); - if (ev == NNG_PIPE_EV_ADD_PRE) { - p->p_cbs = true; - } else if (!p->p_cbs) { - nni_mtx_unlock(&s->s_pipe_cbs_mtx); - return; - } - cb = s->s_pipe_cbs[ev].cb_fn; - arg = s->s_pipe_cbs[ev].cb_arg; + cb = s->s_pipe_cbs[ev].cb_fn; + arg = s->s_pipe_cbs[ev].cb_arg; + wantevs = s->s_want_evs; nni_mtx_unlock(&s->s_pipe_cbs_mtx); - if (cb != NULL) { + if (wantevs) { + nni_mtx_lock(&serialize); + // this pipe never got an event before, so don't start now + if (p->p_last_event == NNG_PIPE_EV_NONE && + ev != NNG_PIPE_EV_ADD_PRE) { + nni_mtx_unlock(&serialize); + return; + } + if (p->p_last_event >= ev) { + // this pipe event already was notified, or a "later" + // one was, so don't go backwards. + nni_mtx_unlock(&serialize); + return; + } + p->p_last_event = ev; nng_pipe pid; pid.id = p->p_id; - cb(pid, ev, arg); + if (cb != NULL) { + cb(pid, ev, arg); + } + nni_mtx_unlock(&serialize); } } diff --git a/src/core/sockimpl.h b/src/core/sockimpl.h index 3140b97b2..92c0bb693 100644 --- a/src/core/sockimpl.h +++ b/src/core/sockimpl.h @@ -1,5 +1,5 @@ // -// Copyright 2023 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -104,9 +104,9 @@ struct nni_pipe { nni_listener *p_listener; nni_atomic_bool p_closed; nni_atomic_flag p_stop; - bool p_cbs; nni_reap_node p_reap; nni_refcnt p_refcnt; + nng_pipe_ev p_last_event; #ifdef NNG_ENABLE_STATS nni_stat_item st_root; @@ -125,7 +125,6 @@ extern int nni_sock_add_listener(nni_sock *, nni_listener *); extern void nni_sock_remove_listener(nni_listener *); extern void nni_sock_remove_dialer(nni_dialer *); -extern void nni_dialer_add_pipe(nni_dialer *, void *); extern void nni_dialer_shutdown(nni_dialer *); extern void nni_dialer_reap(nni_dialer *); extern void nni_dialer_destroy(nni_dialer *); @@ -133,7 +132,6 @@ extern void nni_dialer_timer_start(nni_dialer *); extern void nni_dialer_stop(nni_dialer *); extern void nni_listener_start_pipe(nni_listener *, nni_pipe *); -extern void nni_listener_add_pipe(nni_listener *, void *); extern void nni_listener_shutdown(nni_listener *); extern void nni_listener_reap(nni_listener *); extern void nni_listener_destroy(nni_listener *);