Skip to content

Commit

Permalink
fixes #961 surprising pipe event order
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Dec 29, 2024
1 parent bef6b37 commit 67b4cea
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 25 deletions.
1 change: 1 addition & 0 deletions include/nng/nng.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ uint32_t nng_sockaddr_port(const nng_sockaddr *sa);
// Only one callback can be set on a given socket, and there is no way
// to retrieve the old value.
typedef enum {
NNG_PIPE_EV_NONE, // Used internally, must be first, never posted
NNG_PIPE_EV_ADD_PRE, // Called just before pipe added to socket
NNG_PIPE_EV_ADD_POST, // Called just after pipe added to socket
NNG_PIPE_EV_REM_POST, // Called just after pipe removed from socket
Expand Down
14 changes: 7 additions & 7 deletions src/core/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,13 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, nni_dialer *d,
return (NNG_ENOMEM);
}

p->p_size = sz;
p->p_proto_ops = *pops;
p->p_tran_ops = *tops;
p->p_sock = sock;
p->p_cbs = false;
p->p_dialer = d;
p->p_listener = l;
p->p_size = sz;
p->p_proto_ops = *pops;
p->p_tran_ops = *tops;
p->p_sock = sock;
p->p_dialer = d;
p->p_listener = l;
p->p_last_event = NNG_PIPE_EV_NONE;

// Two references - one for our caller, and
// one to be dropped when the pipe is closed.
Expand Down
49 changes: 35 additions & 14 deletions src/core/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ struct nni_socket {

nni_mtx s_pipe_cbs_mtx;
nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];
bool s_want_evs;

#ifdef NNG_ENABLE_STATS
nni_stat_item st_root; // socket scope
Expand Down Expand Up @@ -1013,10 +1014,16 @@ nni_sock_flags(nni_sock *sock)
void
nni_sock_set_pipe_cb(nni_sock *s, int ev, nng_pipe_cb cb, void *arg)
{
if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) {
if ((ev > NNG_PIPE_EV_NONE) && (ev < NNG_PIPE_EV_NUM)) {
nni_mtx_lock(&s->s_pipe_cbs_mtx);
s->s_pipe_cbs[ev].cb_fn = cb;
s->s_pipe_cbs[ev].cb_arg = arg;
s->s_want_evs = false;
for (ev = NNG_PIPE_EV_NONE; ev < NNG_PIPE_EV_NUM; ev++) {
if (s->s_pipe_cbs[ev].cb_fn != NULL) {
s->s_want_evs = true;
}
}
nni_mtx_unlock(&s->s_pipe_cbs_mtx);
}
}
Expand Down Expand Up @@ -1538,25 +1545,39 @@ nni_pipe_start(nni_pipe *p)
void
nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
{
nni_sock *s = p->p_sock;
nng_pipe_cb cb;
void *arg;
nni_sock *s = p->p_sock;
nng_pipe_cb cb;
void *arg;
bool wantevs;
static nni_mtx serialize = NNI_MTX_INITIALIZER;

nni_mtx_lock(&s->s_pipe_cbs_mtx);
if (ev == NNG_PIPE_EV_ADD_PRE) {
p->p_cbs = true;
} else if (!p->p_cbs) {
nni_mtx_unlock(&s->s_pipe_cbs_mtx);
return;
}
cb = s->s_pipe_cbs[ev].cb_fn;
arg = s->s_pipe_cbs[ev].cb_arg;
cb = s->s_pipe_cbs[ev].cb_fn;
arg = s->s_pipe_cbs[ev].cb_arg;
wantevs = s->s_want_evs;
nni_mtx_unlock(&s->s_pipe_cbs_mtx);

if (cb != NULL) {
if (wantevs) {
nni_mtx_lock(&serialize);
// this pipe never got an event before, so don't start now
if (p->p_last_event == NNG_PIPE_EV_NONE &&
ev != NNG_PIPE_EV_ADD_PRE) {
nni_mtx_unlock(&serialize);
return;

Check warning on line 1566 in src/core/socket.c

View check run for this annotation

Codecov / codecov/patch

src/core/socket.c#L1565-L1566

Added lines #L1565 - L1566 were not covered by tests
}
if (p->p_last_event >= ev) {
// this pipe event already was notified, or a "later"
// one was, so don't go backwards.
nni_mtx_unlock(&serialize);
return;

Check warning on line 1572 in src/core/socket.c

View check run for this annotation

Codecov / codecov/patch

src/core/socket.c#L1571-L1572

Added lines #L1571 - L1572 were not covered by tests
}
p->p_last_event = ev;
nng_pipe pid;
pid.id = p->p_id;
cb(pid, ev, arg);
if (cb != NULL) {
cb(pid, ev, arg);
}
nni_mtx_unlock(&serialize);
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/core/sockimpl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2023 Staysail Systems, Inc. <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand Down Expand Up @@ -104,9 +104,9 @@ struct nni_pipe {
nni_listener *p_listener;
nni_atomic_bool p_closed;
nni_atomic_flag p_stop;
bool p_cbs;
nni_reap_node p_reap;
nni_refcnt p_refcnt;
nng_pipe_ev p_last_event;

#ifdef NNG_ENABLE_STATS
nni_stat_item st_root;
Expand All @@ -125,15 +125,13 @@ extern int nni_sock_add_listener(nni_sock *, nni_listener *);
extern void nni_sock_remove_listener(nni_listener *);
extern void nni_sock_remove_dialer(nni_dialer *);

extern void nni_dialer_add_pipe(nni_dialer *, void *);
extern void nni_dialer_shutdown(nni_dialer *);
extern void nni_dialer_reap(nni_dialer *);
extern void nni_dialer_destroy(nni_dialer *);
extern void nni_dialer_timer_start(nni_dialer *);
extern void nni_dialer_stop(nni_dialer *);

extern void nni_listener_start_pipe(nni_listener *, nni_pipe *);
extern void nni_listener_add_pipe(nni_listener *, void *);
extern void nni_listener_shutdown(nni_listener *);
extern void nni_listener_reap(nni_listener *);
extern void nni_listener_destroy(nni_listener *);
Expand Down

0 comments on commit 67b4cea

Please sign in to comment.