Skip to content

Commit

Permalink
* FIX [mqtt/transport] fix nanomsg#285 nanomsg#308 implement keepali…
Browse files Browse the repository at this point in the history
…ve timer in transport layer
  • Loading branch information
JaylinYu committed Feb 3, 2022
1 parent b4da47c commit 567044c
Showing 1 changed file with 41 additions and 10 deletions.
51 changes: 41 additions & 10 deletions src/mqtt/transport/tcp/mqtt_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ struct mqtt_tcptran_pipe {
nni_pipe * npipe;
uint16_t peer;
uint16_t proto;
uint16_t keepalive; //ms
size_t rcvmax;
bool closed;
bool busy;
nni_list_node node;
mqtt_tcptran_ep *ep;
nni_atomic_flag reaped;
Expand All @@ -45,17 +47,16 @@ struct mqtt_tcptran_pipe {
size_t wantrxhead;
nni_list recvq;
nni_list sendq;
nni_aio tmaio;
nni_aio * txaio;
nni_aio * rxaio;
nni_aio * rsaio; // aio for puback/pubrecv
nni_aio * qsaio; // aio for pubrel
nni_aio * rpaio; // aio for pubcomp
nni_aio * qsaio; // aio for pubrel/pingreq
nni_lmq rslmq;
nni_aio * negoaio;
nni_msg * rxmsg;
nni_msg * smsg;
nni_mtx mtx;
bool busy;
};

struct mqtt_tcptran_ep {
Expand Down Expand Up @@ -128,6 +129,32 @@ nni_msg_get_pub_pid(nni_msg *m)
return pid;
}

static void
mqtt_pipe_timer_cb(void *arg)
{
mqtt_tcptran_pipe *p = arg;
uint8_t buf[2];

if (nng_aio_result(&p->tmaio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
if (!p->busy) {
// send pingreq
buf[0] = 0xC0;
buf[1] = 0x00;

nni_iov iov;
iov.iov_len = 2;
iov.iov_buf = &buf;
// send it down...
nni_aio_set_iov(p->qsaio, 1, &iov);
nng_stream_send(p->conn, p->qsaio);
}
nni_mtx_unlock(&p->mtx);
nni_sleep_aio(p->keepalive, &p->tmaio);
}

static void
mqtt_tcptran_pipe_close(void *arg)
{
Expand All @@ -141,8 +168,8 @@ mqtt_tcptran_pipe_close(void *arg)
nni_aio_close(p->rsaio);
nni_aio_close(p->qsaio);
nni_aio_close(p->txaio);
nni_aio_close(p->rpaio);
nni_aio_close(p->negoaio);
nni_aio_close(&p->tmaio);

nng_stream_close(p->conn);
}
Expand All @@ -155,9 +182,9 @@ mqtt_tcptran_pipe_stop(void *arg)
nni_aio_stop(p->rxaio);
nni_aio_stop(p->rsaio);
nni_aio_stop(p->qsaio);
nni_aio_stop(p->rpaio);
nni_aio_stop(p->txaio);
nni_aio_stop(p->negoaio);
nni_aio_stop(&p->tmaio);
}

static int
Expand All @@ -167,7 +194,9 @@ mqtt_tcptran_pipe_init(void *arg, nni_pipe *npipe)
p->npipe = npipe;

nni_lmq_init(&p->rslmq, 1024); // FIXME: remove hard code value
nni_aio_init(&p->tmaio, mqtt_pipe_timer_cb, p);
p->busy = false;
nni_sleep_aio(p->keepalive, &p->tmaio);
return (0);
}

Expand All @@ -192,12 +221,12 @@ mqtt_tcptran_pipe_fini(void *arg)
nni_aio_free(p->txaio);
nni_aio_free(p->rsaio);
nni_aio_free(p->qsaio);
nni_aio_free(p->rpaio);
nni_aio_free(p->negoaio);
nng_stream_free(p->conn);
nni_msg_free(p->rxmsg);
nni_lmq_fini(&p->rslmq);
nni_mtx_fini(&p->mtx);
nni_aio_fini(&p->tmaio);
NNI_FREE_STRUCT(p);
}

Expand Down Expand Up @@ -229,7 +258,6 @@ mqtt_tcptran_pipe_alloc(mqtt_tcptran_pipe **pipep)
((rv = nni_aio_alloc(&p->rsaio, NULL, p)) != 0) ||
((rv = nni_aio_alloc(
&p->qsaio, mqtt_tcptran_pipe_qos_send_cb, p)) != 0) ||
((rv = nni_aio_alloc(&p->rpaio, NULL, p)) != 0) ||
((rv = nni_aio_alloc(&p->negoaio, mqtt_tcptran_pipe_nego_cb, p)) !=
0)) {
mqtt_tcptran_pipe_fini(p);
Expand Down Expand Up @@ -392,9 +420,6 @@ static void
mqtt_tcptran_pipe_qos_send_cb(void *arg)
{
mqtt_tcptran_pipe *p = arg;
// int rv;
// nni_aio * aio;
// size_t n;
nni_msg *msg;
nni_aio *qsaio = p->qsaio;

Expand Down Expand Up @@ -577,6 +602,7 @@ mqtt_tcptran_pipe_recv_cb(void *arg)
iov.iov_buf = &p->txlen;
// send it down...
nni_aio_set_iov(p->rsaio, 1, &iov);
//Replace rsaio with qsaio too
nng_stream_send(p->conn, p->rsaio);
}
} else if (type == 0x50) {
Expand Down Expand Up @@ -628,6 +654,7 @@ mqtt_tcptran_pipe_recv_cb(void *arg)

nni_aio_set_msg(aio, msg);
nni_mtx_unlock(&p->mtx);

nni_aio_finish_sync(aio, 0, n);
return;

Expand Down Expand Up @@ -844,6 +871,8 @@ mqtt_tcptran_pipe_start(
NULL, NNI_TYPE_POINTER);
if (!connmsg) {
nni_list_append(&ep->waitpipes, p);
// 60s as the default keepalive timeout.
p->keepalive = 60 * 1000;
mqtt_tcptran_ep_match(ep);
return;
}
Expand All @@ -859,6 +888,7 @@ mqtt_tcptran_pipe_start(
p->wantrxhead = 2;
p->wanttxhead = nni_msg_header_len(connmsg) + nni_msg_len(connmsg);
p->rxmsg = NULL;
p->keepalive = nni_mqtt_msg_get_connect_keep_alive(connmsg) * 1000;

if (nni_msg_header_len(connmsg) > 0) {
iov[niov].iov_buf = nni_msg_header(connmsg);
Expand All @@ -873,6 +903,7 @@ mqtt_tcptran_pipe_start(
nni_aio_set_iov(p->negoaio, niov, iov);
nni_list_append(&ep->negopipes, p);


nni_aio_set_timeout(p->negoaio, 10000); // 10 sec timeout to negotiate
nng_stream_send(p->conn, p->negoaio);
}
Expand Down

0 comments on commit 567044c

Please sign in to comment.