Skip to content

Commit

Permalink
* FIX [mqtt_quic] fix bridge switch of mqtt_quic
Browse files Browse the repository at this point in the history
Signed-off-by: jaylin <[email protected]>
  • Loading branch information
JaylinYu committed Jan 13, 2025
1 parent 3c58263 commit fd1c5a8
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
32 changes: 20 additions & 12 deletions src/mqtt/transport/quic/mqtt_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ struct mqtt_quictran_pipe {
nni_lmq rslmq; // resend lmq
nni_lmq rxlmq; // recv lmq, shared by all streams
nni_mtx mtx;
bool closed;
nni_atomic_bool closed;
bool busy;
#ifdef NNG_HAVE_MQTT_BROKER
nni_msg *connack;
Expand Down Expand Up @@ -156,9 +156,8 @@ mqtt_quictran_pipe_close(void *arg)
mqtt_quictran_pipe *p = arg;

nng_stream_close(p->conn);
nni_atomic_set_bool(&p->closed, true);
nni_mtx_lock(&p->mtx);

p->closed = true;
for (uint8_t i = 0; i < QUIC_SUB_STREAM_NUM; i++)
{
quic_substream *stream = &p->substreams[i];
Expand Down Expand Up @@ -203,7 +202,8 @@ mqtt_quictran_pipe_init(void *arg, nni_pipe *npipe)
nni_lmq_init(&p->rslmq, 1024);
nni_lmq_init(&p->rxlmq, 1024);
p->busy = false;
p->closed = false;
nni_atomic_init_bool(&p->closed);
nni_atomic_set_bool(&p->closed, false);
// set max value by default
p->packmax == 0 ? p->packmax = (uint32_t)0xFFFFFFFF : p->packmax;
p->qosmax == 0 ? p->qosmax = 2 : p->qosmax;
Expand Down Expand Up @@ -569,7 +569,7 @@ mqtt_quictran_share_qos_send_cb(void *arg, nni_aio *qsaio, quic_substream *strea
int rv;

msg = nni_aio_get_msg(qsaio);
if (p->closed) {
if (nni_atomic_get_bool(&p->closed)) {
if (msg != NULL)
nni_msg_free(msg);
return;
Expand Down Expand Up @@ -785,13 +785,13 @@ mqtt_share_pipe_recv_cb(void *arg, nni_aio *rxaio, quic_substream *stream, nni_m
log_info("aio error result %s stream %p", nng_strerror(rv), stream);
if (stream == NULL) {
// set close flag to prevent infinit stream recv
p->closed = true;
nni_atomic_set_bool(&p->closed, true);
// NNI_ASSERT(rv == NNG_ECANCELED);
rv = NNG_ECONNABORTED;
}
goto recv_error;
}
if (p->closed) {
if (nni_atomic_get_bool(&p->closed)) {
goto recv_error;
}
uint16_t *id;
Expand Down Expand Up @@ -1083,7 +1083,7 @@ mqtt_share_pipe_recv_cb(void *arg, nni_aio *rxaio, quic_substream *stream, nni_m
recv_error:
if (aio)
nni_aio_list_remove(aio);
else if (!p->closed) {
else if (!nni_atomic_get_bool(&p->closed)) {
nni_iov sub_iov;
if (stream) {
// nni_msleep(500);
Expand Down Expand Up @@ -1118,7 +1118,7 @@ mqtt_quictran_pipe_send_prior(mqtt_quictran_pipe *p, nni_aio *aio)
int niov;
nni_iov iov[3];

if (p->closed) {
if (nni_atomic_get_bool(&p->closed)) {
nni_aio_finish_error(aio, SERVER_SHUTTING_DOWN);
return;
}
Expand Down Expand Up @@ -1199,7 +1199,7 @@ mqtt_quictran_pipe_send_start(mqtt_quictran_pipe *p)
int niov;
nni_iov iov[3];

if (p->closed) {
if (nni_atomic_get_bool(&p->closed)) {
while ((aio = nni_list_first(&p->sendq)) != NULL) {
nni_list_remove(&p->sendq, aio);
nni_aio_finish_error(aio, SERVER_SHUTTING_DOWN);
Expand Down Expand Up @@ -1268,6 +1268,10 @@ mqtt_quictran_pipe_send(void *arg, nni_aio *aio)
mqtt_quictran_pipe *p = arg;
int rv;

if (nni_atomic_get_bool(&p->closed)) {
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
nni_mtx_lock(&p->mtx);
// Priority msg
int *flags = nni_aio_get_prov_data(aio);
Expand Down Expand Up @@ -1322,7 +1326,7 @@ mqtt_quictran_pipe_recv_start(mqtt_quictran_pipe *p, nni_aio *aio)
{
nni_iov iov;

if (p->closed) {
if (nni_atomic_get_bool(&p->closed)) {
while ((aio = nni_list_first(&p->recvq)) != NULL) {
nni_list_remove(&p->recvq, aio);
nni_aio_finish_error(aio, NNG_ECONNABORTED);
Expand Down Expand Up @@ -1794,6 +1798,10 @@ mqtt_quictran_ep_connect(void *arg, nni_aio *aio)
log_error("ep connect rv %d", rv);
return;
}
if (ep->closed) {
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
if (ep->backoff != 0) {
ep->backoff = ep->backoff * 2;
ep->backoff = ep->backoff > ep->backoff_max
Expand Down Expand Up @@ -1929,7 +1937,7 @@ mqtt_quictran_ep_set_ep_closed(void *arg, const void *v, size_t sz, nni_opt_type
if (tmp == true) {
mqtt_quictran_pipe *p;
NNI_LIST_FOREACH (&ep->busypipes, p) {
mqtt_quictran_pipe_close(p);
nni_pipe_close(p->npipe);
}
}
nni_mtx_unlock(&ep->mtx);
Expand Down
3 changes: 1 addition & 2 deletions src/mqtt/transport/tls/mqtt_tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -953,8 +953,7 @@ mqtts_tcptran_pipe_send(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&p->mtx);
if ((rv = nni_aio_schedule(aio, mqtts_tcptran_pipe_send_cancel, p)) !=
0) {
if ((rv = nni_aio_schedule(aio, mqtts_tcptran_pipe_send_cancel, p)) != 0) {
nni_mtx_unlock(&p->mtx);
nni_aio_finish_error(aio, rv);
return;
Expand Down

0 comments on commit fd1c5a8

Please sign in to comment.