Skip to content

Commit

Permalink
* MDF [mqtt/protocol] return submsg when schedule failed & retry
Browse files Browse the repository at this point in the history
Signed-off-by: jaylin <[email protected]>
  • Loading branch information
JaylinYu committed Jan 24, 2025
1 parent 1b5fad6 commit a01776c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
13 changes: 10 additions & 3 deletions src/mqtt/protocol/mqtt/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -540,13 +540,20 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg)
if ((rv = nni_aio_schedule(aio, mqtt_ctx_cancel_send, ctx)) != 0) {
log_warn("Cancel_Func scheduling failed, send abort!");
nni_id_remove(&s->sent_unack, packet_id);
nni_aio_set_msg(aio, NULL);
if (ptype == NNG_MQTT_SUBSCRIBE) {
nni_aio_set_msg(aio, msg); // only preserve subscribe msg
} else {
nni_aio_set_msg(aio, NULL);
nni_msg_free(msg); // User need to realloc this msg again
}
nni_mtx_unlock(&s->mtx);
nni_msg_free(msg); // User need to realloc this msg again
#ifdef NNG_ENABLE_STATS
nni_stat_inc(&s->msg_send_drop, 1);
#endif
nni_aio_finish_error(aio, rv);
if (ptype == NNG_MQTT_SUBSCRIBE)
nni_aio_finish_error(aio, rv);
else
nni_aio_finish(aio, rv, 0);
return;
}
// pass proto_data to cached aio, either it is freed in ack or in cancel
Expand Down
15 changes: 12 additions & 3 deletions src/supplemental/mqtt/mqtt_public.c
Original file line number Diff line number Diff line change
Expand Up @@ -847,14 +847,23 @@ nng_mqtt_client_send_cb(void* arg)

nni_lmq * lmq = (nni_lmq *)client->msgq;
// in case of data conention while fini pipes
if (nng_aio_result(aio) == NNG_ECLOSED)
return;
int rv = nng_aio_result(aio);

nng_msg * msg = nng_aio_get_msg(aio);
if (msg == NULL || nng_aio_result(aio) != 0) {
if (msg == NULL && rv != 0) {
log_warn("bridge send aio rv %d", rv);
client->cb(client, NULL, client->obj);
return;
}
if (rv != 0) {
if (msg != NULL && nni_mqtt_msg_get_packet_type(msg) == NNG_MQTT_SUBSCRIBE) {
nng_aio_set_msg(client->send_aio, msg);
log_info("resend subscribe msg again!");
nng_send_aio(client->sock, client->send_aio);
return;
}
}


if (nni_lmq_get(lmq, &tmsg) == 0) {
nng_aio_set_msg(client->send_aio, tmsg);
Expand Down

0 comments on commit a01776c

Please sign in to comment.