Skip to content

Commit

Permalink
streams: add explicit stop functions
Browse files Browse the repository at this point in the history
This allows us to explicitly stop streams, dialers, and listeners,
before we start tearing down things. This hopefully will be useful
in resolving use-after-free bugs in http, tls, and websockets.

The new functions are not yet documented, but they are
nng_stream_stop, nng_stream_dialer_stop, and nng_stream_listener_stop.
They should be called after close, and before free.  The close
functions now close without blocking, but the stop function is
allowed to block.
  • Loading branch information
gdamore committed Dec 13, 2024
1 parent 371eede commit 81f5d3c
Show file tree
Hide file tree
Showing 31 changed files with 424 additions and 114 deletions.
7 changes: 6 additions & 1 deletion demo/stream/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ int
client(const char *url)
{
nng_stream_dialer *dialer;
nng_aio * aio;
nng_aio *aio;
nng_iov iov;
int rv;

Expand Down Expand Up @@ -102,6 +102,11 @@ client(const char *url)

// Send ELCOSE to send/recv associated wit this stream
free(iov.iov_buf);

// stop everything before freeing
nng_stream_stop(c1);
nng_stream_dialer_stop(dialer);

nng_stream_free(c1);
nng_aio_free(aio);
nng_stream_dialer_free(dialer);
Expand Down
3 changes: 3 additions & 0 deletions include/nng/nng.h
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,7 @@ typedef struct nng_stream_listener nng_stream_listener;

NNG_DECL void nng_stream_free(nng_stream *);
NNG_DECL void nng_stream_close(nng_stream *);
NNG_DECL void nng_stream_stop(nng_stream *);
NNG_DECL void nng_stream_send(nng_stream *, nng_aio *);
NNG_DECL void nng_stream_recv(nng_stream *, nng_aio *);
NNG_DECL int nng_stream_get_bool(nng_stream *, const char *, bool *);
Expand All @@ -1152,6 +1153,7 @@ NNG_DECL int nng_stream_dialer_alloc_url(
nng_stream_dialer **, const nng_url *);
NNG_DECL void nng_stream_dialer_free(nng_stream_dialer *);
NNG_DECL void nng_stream_dialer_close(nng_stream_dialer *);
NNG_DECL void nng_stream_dialer_stop(nng_stream_dialer *);
NNG_DECL void nng_stream_dialer_dial(nng_stream_dialer *, nng_aio *);
NNG_DECL int nng_stream_dialer_get_bool(
nng_stream_dialer *, const char *, bool *);
Expand Down Expand Up @@ -1193,6 +1195,7 @@ NNG_DECL int nng_stream_listener_alloc_url(
nng_stream_listener **, const nng_url *);
NNG_DECL void nng_stream_listener_free(nng_stream_listener *);
NNG_DECL void nng_stream_listener_close(nng_stream_listener *);
NNG_DECL void nng_stream_listener_stop(nng_stream_listener *);
NNG_DECL int nng_stream_listener_listen(nng_stream_listener *);
NNG_DECL void nng_stream_listener_accept(nng_stream_listener *, nng_aio *);
NNG_DECL int nng_stream_listener_get_bool(
Expand Down
5 changes: 5 additions & 0 deletions src/core/platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ extern void nni_tcp_dialer_fini(nni_tcp_dialer *);
// Further operations on it should return NNG_ECLOSED.
// Any in-progress connection will be aborted.
extern void nni_tcp_dialer_close(nni_tcp_dialer *);
extern void nni_tcp_dialer_stop(nni_tcp_dialer *);

// nni_tcp_dial attempts to create an outgoing connection,
// asynchronously, to the address in the aio. On success, the first (and only)
Expand All @@ -318,6 +319,10 @@ extern void nni_tcp_listener_fini(nni_tcp_listener *);
// any bound socket, and further operations will result in NNG_ECLOSED.
extern void nni_tcp_listener_close(nni_tcp_listener *);

// nni_tcp_listener_stop is close + waits for any operations to stop,
// so there won't be any further accepts after this.
extern void nni_tcp_listener_stop(nni_tcp_listener *);

// nni_tcp_listener_listen creates the socket in listening mode, bound
// to the specified address.
extern int nni_tcp_listener_listen(nni_tcp_listener *, const nni_sockaddr *);
Expand Down
7 changes: 7 additions & 0 deletions src/core/sockfd.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ sfd_listener_close(void *arg)
nni_mtx_unlock(&l->mtx);
}

static void
sfd_listener_stop(void *arg)

Check warning on line 67 in src/core/sockfd.c

View check run for this annotation

Codecov / codecov/patch

src/core/sockfd.c#L67

Added line #L67 was not covered by tests
{
sfd_listener_close(arg);
}

Check warning on line 70 in src/core/sockfd.c

View check run for this annotation

Codecov / codecov/patch

src/core/sockfd.c#L69-L70

Added lines #L69 - L70 were not covered by tests

static int
sfd_listener_listen(void *arg)
{
Expand Down Expand Up @@ -222,6 +228,7 @@ nni_sfd_listener_alloc(nng_stream_listener **lp, const nng_url *url)

l->ops.sl_free = sfd_listener_free;
l->ops.sl_close = sfd_listener_close;
l->ops.sl_stop = sfd_listener_stop;
l->ops.sl_listen = sfd_listener_listen;
l->ops.sl_accept = sfd_listener_accept;
l->ops.sl_get = sfd_listener_get;
Expand Down
37 changes: 34 additions & 3 deletions src/core/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,17 @@ static struct {
void
nng_stream_close(nng_stream *s)
{
s->s_close(s);
if (s != NULL) {
s->s_close(s);
}
}

void
nng_stream_stop(nng_stream *s)
{
if (s != NULL) {
s->s_stop(s);
}
}

void
Expand Down Expand Up @@ -149,7 +159,17 @@ nni_stream_get(
void
nng_stream_dialer_close(nng_stream_dialer *d)
{
d->sd_close(d);
if (d != NULL) {
d->sd_close(d);
}
}

void
nng_stream_dialer_stop(nng_stream_dialer *d)
{
if (d != NULL) {
d->sd_stop(d);
}
}

void
Expand Down Expand Up @@ -226,8 +246,19 @@ nni_stream_dialer_set_tls(nng_stream_dialer *d, nng_tls_config *cfg)
void
nng_stream_listener_close(nng_stream_listener *l)
{
l->sl_close(l);
if (l != NULL) {
l->sl_close(l);
}
}

void
nng_stream_listener_stop(nng_stream_listener *l)
{
if (l != NULL) {
l->sl_stop(l);
}
}

void
nng_stream_listener_free(nng_stream_listener *l)
{
Expand Down
5 changes: 4 additions & 1 deletion src/core/stream.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2020 Staysail Systems, Inc. <[email protected]>
// Copyright 2024 Staysail Systems, Inc. <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
Expand Down Expand Up @@ -43,6 +43,7 @@ extern int nni_stream_listener_get_tls(
struct nng_stream {
void (*s_free)(void *);
void (*s_close)(void *);
void (*s_stop)(void *);
void (*s_recv)(void *, nng_aio *);
void (*s_send)(void *, nng_aio *);
int (*s_get)(void *, const char *, void *, size_t *, nni_type);
Expand All @@ -53,6 +54,7 @@ struct nng_stream {
struct nng_stream_dialer {
void (*sd_free)(void *);
void (*sd_close)(void *);
void (*sd_stop)(void *);
void (*sd_dial)(void *, nng_aio *);
int (*sd_get)(void *, const char *, void *, size_t *, nni_type);
int (*sd_set)(void *, const char *, const void *, size_t, nni_type);
Expand All @@ -65,6 +67,7 @@ struct nng_stream_dialer {
struct nng_stream_listener {
void (*sl_free)(void *);
void (*sl_close)(void *);
void (*sl_stop)(void *);
int (*sl_listen)(void *);
void (*sl_accept)(void *, nng_aio *);
int (*sl_get)(void *, const char *, void *, size_t *, nni_type);
Expand Down
37 changes: 30 additions & 7 deletions src/core/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ tcp_dial_con_cb(void *arg)
if ((d->closed) || ((aio = nni_list_first(&d->conaios)) == NULL)) {
if (rv == 0) {
// Make sure we discard the underlying connection.
nng_stream_close(nni_aio_get_output(d->conaio, 0));
nng_stream_stop(nni_aio_get_output(d->conaio, 0));

Check warning on line 108 in src/core/tcp.c

View check run for this annotation

Codecov / codecov/patch

src/core/tcp.c#L107-L108

Added lines #L107 - L108 were not covered by tests
nng_stream_free(nni_aio_get_output(d->conaio, 0));
nni_aio_set_output(d->conaio, 0, NULL);
}
Expand All @@ -127,14 +129,26 @@ tcp_dialer_close(void *arg)
{
tcp_dialer *d = arg;
nni_aio *aio;
nni_mtx_lock(&d->mtx);
d->closed = true;
while ((aio = nni_list_first(&d->conaios)) != NULL) {
nni_list_remove(&d->conaios, aio);
nni_aio_finish_error(aio, NNG_ECLOSED);

if (d != NULL) {
nni_mtx_lock(&d->mtx);
d->closed = true;
while ((aio = nni_list_first(&d->conaios)) != NULL) {
nni_list_remove(&d->conaios, aio);
nni_aio_finish_error(aio, NNG_ECLOSED);
}
nni_tcp_dialer_close(d->d);
nni_mtx_unlock(&d->mtx);
}
}

static void
tcp_dialer_stop(void *arg)
{
tcp_dialer *d = arg;
if (d != NULL) {
nni_tcp_dialer_stop(d->d);
}
nni_tcp_dialer_close(d->d);
nni_mtx_unlock(&d->mtx);
}

static void
Expand Down Expand Up @@ -222,6 +236,7 @@ tcp_dialer_alloc(tcp_dialer **dp)

d->ops.sd_close = tcp_dialer_close;
d->ops.sd_free = tcp_dialer_free;
d->ops.sd_stop = tcp_dialer_stop;
d->ops.sd_dial = tcp_dialer_dial;
d->ops.sd_get = tcp_dialer_get;
d->ops.sd_set = tcp_dialer_set;
Expand Down Expand Up @@ -275,6 +290,13 @@ tcp_listener_close(void *arg)
nni_tcp_listener_close(l->l);
}

static void
tcp_listener_stop(void *arg)
{
tcp_listener *l = arg;
nni_tcp_listener_stop(l->l);
}

static void
tcp_listener_free(void *arg)
{
Expand Down Expand Up @@ -372,6 +394,7 @@ tcp_listener_alloc_addr(nng_stream_listener **lp, const nng_sockaddr *sa)

l->ops.sl_free = tcp_listener_free;
l->ops.sl_close = tcp_listener_close;
l->ops.sl_stop = tcp_listener_stop;
l->ops.sl_listen = tcp_listener_listen;
l->ops.sl_accept = tcp_listener_accept;
l->ops.sl_get = tcp_listener_get;
Expand Down
25 changes: 20 additions & 5 deletions src/platform/posix/posix_ipcconn.c
Original file line number Diff line number Diff line change
Expand Up @@ -386,15 +386,29 @@ nni_posix_ipc_start(nni_ipc_conn *c)
}

static void
ipc_reap(void *arg)
ipc_stop(void *arg)
{
ipc_conn *c = arg;
ipc_conn *c = arg;
nni_posix_pfd *pfd;

ipc_close(c);
if (c->pfd != NULL) {
nni_posix_pfd_fini(c->pfd);
nni_mtx_lock(&c->mtx);
pfd = c->pfd;
c->pfd = NULL;
nni_mtx_unlock(&c->mtx);

if (pfd != NULL) {
nni_posix_pfd_fini(pfd);
}
nni_mtx_fini(&c->mtx);
}

static void
ipc_reap(void *arg)
{
ipc_conn *c = arg;
ipc_stop(c);

nni_mtx_fini(&c->mtx);
if (c->dialer != NULL) {
nni_posix_ipc_dialer_rele(c->dialer);
}
Expand Down Expand Up @@ -470,6 +484,7 @@ nni_posix_ipc_alloc(nni_ipc_conn **cp, nni_sockaddr *sa, nni_ipc_dialer *d)
c->dialer = d;
c->stream.s_free = ipc_free;
c->stream.s_close = ipc_close;
c->stream.s_stop = ipc_stop;
c->stream.s_send = ipc_send;
c->stream.s_recv = ipc_recv;
c->stream.s_get = ipc_get;
Expand Down
14 changes: 13 additions & 1 deletion src/platform/posix/posix_ipcdial.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ ipc_dialer_close(void *arg)
c->dial_aio = NULL;
nni_aio_set_prov_data(aio, NULL);
nng_stream_close(&c->stream);
nng_stream_stop(&c->stream);

Check warning on line 44 in src/platform/posix/posix_ipcdial.c

View check run for this annotation

Codecov / codecov/patch

src/platform/posix/posix_ipcdial.c#L44

Added line #L44 was not covered by tests
nng_stream_free(&c->stream);
}
nni_aio_finish_error(aio, NNG_ECLOSED);
Expand All @@ -49,6 +50,13 @@ ipc_dialer_close(void *arg)
nni_mtx_unlock(&d->mtx);
}

static void
ipc_dialer_stop(void *arg)
{
nni_ipc_dialer *d = arg;
ipc_dialer_close(d);
}

static void
ipc_dialer_fini(ipc_dialer *d)
{
Expand All @@ -61,7 +69,7 @@ ipc_dialer_free(void *arg)
{
ipc_dialer *d = arg;

ipc_dialer_close(d);
ipc_dialer_stop(d);
nni_atomic_set_bool(&d->fini, true);
nni_posix_ipc_dialer_rele(d);
}
Expand Down Expand Up @@ -94,6 +102,7 @@ ipc_dialer_cancel(nni_aio *aio, void *arg, int rv)
nni_mtx_unlock(&d->mtx);

nni_aio_finish_error(aio, rv);
nng_stream_stop(&c->stream);

Check warning on line 105 in src/platform/posix/posix_ipcdial.c

View check run for this annotation

Codecov / codecov/patch

src/platform/posix/posix_ipcdial.c#L105

Added line #L105 was not covered by tests
nng_stream_free(&c->stream);
}

Expand Down Expand Up @@ -138,6 +147,7 @@ ipc_dialer_cb(nni_posix_pfd *pfd, unsigned ev, void *arg)

if (rv != 0) {
nng_stream_close(&c->stream);
nng_stream_stop(&c->stream);

Check warning on line 150 in src/platform/posix/posix_ipcdial.c

View check run for this annotation

Codecov / codecov/patch

src/platform/posix/posix_ipcdial.c#L150

Added line #L150 was not covered by tests
nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
return;
Expand Down Expand Up @@ -236,6 +246,7 @@ ipc_dialer_dial(void *arg, nni_aio *aio)
error:
nni_aio_set_prov_data(aio, NULL);
nni_mtx_unlock(&d->mtx);
nng_stream_stop(&c->stream);
nng_stream_free(&c->stream);
nni_aio_finish_error(aio, rv);
}
Expand Down Expand Up @@ -319,6 +330,7 @@ nni_ipc_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
d->closed = false;
d->sd.sd_free = ipc_dialer_free;
d->sd.sd_close = ipc_dialer_close;
d->sd.sd_stop = ipc_dialer_stop;
d->sd.sd_dial = ipc_dialer_dial;
d->sd.sd_get = ipc_dialer_get;
d->sd.sd_set = ipc_dialer_set;
Expand Down
Loading

0 comments on commit 81f5d3c

Please sign in to comment.