diff --git a/src/mqtt/transport/tcp/mqtt_tcp.c b/src/mqtt/transport/tcp/mqtt_tcp.c index 53dbc6eeb..04413757d 100644 --- a/src/mqtt/transport/tcp/mqtt_tcp.c +++ b/src/mqtt/transport/tcp/mqtt_tcp.c @@ -31,8 +31,10 @@ struct mqtt_tcptran_pipe { nni_pipe * npipe; uint16_t peer; uint16_t proto; + uint16_t keepalive; //ms size_t rcvmax; bool closed; + bool busy; nni_list_node node; mqtt_tcptran_ep *ep; nni_atomic_flag reaped; @@ -45,17 +47,16 @@ struct mqtt_tcptran_pipe { size_t wantrxhead; nni_list recvq; nni_list sendq; + nni_aio tmaio; nni_aio * txaio; nni_aio * rxaio; nni_aio * rsaio; // aio for puback/pubrecv - nni_aio * qsaio; // aio for pubrel - nni_aio * rpaio; // aio for pubcomp + nni_aio * qsaio; // aio for pubrel/pingreq nni_lmq rslmq; nni_aio * negoaio; nni_msg * rxmsg; nni_msg * smsg; nni_mtx mtx; - bool busy; }; struct mqtt_tcptran_ep { @@ -128,6 +129,32 @@ nni_msg_get_pub_pid(nni_msg *m) return pid; } +static void +mqtt_pipe_timer_cb(void *arg) +{ + mqtt_tcptran_pipe *p = arg; + uint8_t buf[2]; + + if (nng_aio_result(&p->tmaio) != 0) { + return; + } + nni_mtx_lock(&p->mtx); + if (!p->busy) { + // send pingreq + buf[0] = 0xC0; + buf[1] = 0x00; + + nni_iov iov; + iov.iov_len = 2; + iov.iov_buf = &buf; + // send it down... + nni_aio_set_iov(p->qsaio, 1, &iov); + nng_stream_send(p->conn, p->qsaio); + } + nni_mtx_unlock(&p->mtx); + nni_sleep_aio(p->keepalive, &p->tmaio); +} + static void mqtt_tcptran_pipe_close(void *arg) { @@ -141,8 +168,8 @@ mqtt_tcptran_pipe_close(void *arg) nni_aio_close(p->rsaio); nni_aio_close(p->qsaio); nni_aio_close(p->txaio); - nni_aio_close(p->rpaio); nni_aio_close(p->negoaio); + nni_aio_close(&p->tmaio); nng_stream_close(p->conn); } @@ -155,9 +182,9 @@ mqtt_tcptran_pipe_stop(void *arg) nni_aio_stop(p->rxaio); nni_aio_stop(p->rsaio); nni_aio_stop(p->qsaio); - nni_aio_stop(p->rpaio); nni_aio_stop(p->txaio); nni_aio_stop(p->negoaio); + nni_aio_stop(&p->tmaio); } static int @@ -167,7 +194,9 @@ mqtt_tcptran_pipe_init(void *arg, nni_pipe *npipe) p->npipe = npipe; nni_lmq_init(&p->rslmq, 1024); // FIXME: remove hard code value + nni_aio_init(&p->tmaio, mqtt_pipe_timer_cb, p); p->busy = false; + nni_sleep_aio(p->keepalive, &p->tmaio); return (0); } @@ -192,12 +221,12 @@ mqtt_tcptran_pipe_fini(void *arg) nni_aio_free(p->txaio); nni_aio_free(p->rsaio); nni_aio_free(p->qsaio); - nni_aio_free(p->rpaio); nni_aio_free(p->negoaio); nng_stream_free(p->conn); nni_msg_free(p->rxmsg); nni_lmq_fini(&p->rslmq); nni_mtx_fini(&p->mtx); + nni_aio_fini(&p->tmaio); NNI_FREE_STRUCT(p); } @@ -229,7 +258,6 @@ mqtt_tcptran_pipe_alloc(mqtt_tcptran_pipe **pipep) ((rv = nni_aio_alloc(&p->rsaio, NULL, p)) != 0) || ((rv = nni_aio_alloc( &p->qsaio, mqtt_tcptran_pipe_qos_send_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->rpaio, NULL, p)) != 0) || ((rv = nni_aio_alloc(&p->negoaio, mqtt_tcptran_pipe_nego_cb, p)) != 0)) { mqtt_tcptran_pipe_fini(p); @@ -392,9 +420,6 @@ static void mqtt_tcptran_pipe_qos_send_cb(void *arg) { mqtt_tcptran_pipe *p = arg; - // int rv; - // nni_aio * aio; - // size_t n; nni_msg *msg; nni_aio *qsaio = p->qsaio; @@ -577,6 +602,7 @@ mqtt_tcptran_pipe_recv_cb(void *arg) iov.iov_buf = &p->txlen; // send it down... nni_aio_set_iov(p->rsaio, 1, &iov); + //Replace rsaio with qsaio too nng_stream_send(p->conn, p->rsaio); } } else if (type == 0x50) { @@ -628,6 +654,7 @@ mqtt_tcptran_pipe_recv_cb(void *arg) nni_aio_set_msg(aio, msg); nni_mtx_unlock(&p->mtx); + nni_aio_finish_sync(aio, 0, n); return; @@ -844,6 +871,8 @@ mqtt_tcptran_pipe_start( NULL, NNI_TYPE_POINTER); if (!connmsg) { nni_list_append(&ep->waitpipes, p); + // 60s as the default keepalive timeout. + p->keepalive = 60 * 1000; mqtt_tcptran_ep_match(ep); return; } @@ -859,6 +888,7 @@ mqtt_tcptran_pipe_start( p->wantrxhead = 2; p->wanttxhead = nni_msg_header_len(connmsg) + nni_msg_len(connmsg); p->rxmsg = NULL; + p->keepalive = nni_mqtt_msg_get_connect_keep_alive(connmsg) * 1000; if (nni_msg_header_len(connmsg) > 0) { iov[niov].iov_buf = nni_msg_header(connmsg); @@ -873,6 +903,7 @@ mqtt_tcptran_pipe_start( nni_aio_set_iov(p->negoaio, niov, iov); nni_list_append(&ep->negopipes, p); + nni_aio_set_timeout(p->negoaio, 10000); // 10 sec timeout to negotiate nng_stream_send(p->conn, p->negoaio); }