Skip to content

Commit

Permalink
* NEW [quic/proto] A fast way to deal with allocating multiple quic s…
Browse files Browse the repository at this point in the history
…tream whiling quic msgs are cached thoughtfully.

Signed-off-by: wanghaemq <[email protected]>
  • Loading branch information
wanghaEMQ authored and JaylinYu committed Nov 16, 2023
1 parent 324deb5 commit b6acb5b
Showing 1 changed file with 37 additions and 7 deletions.
44 changes: 37 additions & 7 deletions src/mqtt/protocol/mqtt/mqtt_quic_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1304,9 +1304,38 @@ quic_mqtt_stream_bind(mqtt_sock_t *s, mqtt_pipe_t *p, nni_pipe *npipe)
}
nni_msg_clone(msg);
p->idmsg = msg;
log_info("New Pub Stream has been bound to topic (code %ld)", hash);
log_info("New Pub Stream has been bound to %.*s (code %ld)", topic_len, topic, hash);
// must be a new pub stream
mqtt_pipe_send_msg(nni_mqtt_msg_get_aio(msg), msg, p, 0);
if ((rv = mqtt_pipe_send_msg(nni_mqtt_msg_get_aio(msg), msg, p, 0)) >= 0) {
nni_aio_finish(nni_mqtt_msg_get_aio(msg), rv, 0);
}
char *num_ptr;
if (NULL != (num_ptr = nni_id_get(s->topic_map, hash))) {
// Iterate topic lmq and send
size_t lmqsz = nni_lmq_len(s->topic_lmq);
log_debug("topic_lmq sz%ld Cached msg number%d", lmqsz, (int)(num_ptr - (char *)s));
for (int i=0; i<(int)lmqsz && (void *)num_ptr != s; ++i) {
nng_msg *lm;
nni_lmq_get(s->topic_lmq, &lm);
if (lm == NULL)
continue;
if (nni_mqtt_msg_get_packet_type(lm) != NNG_MQTT_PUBLISH) {
nni_lmq_put(s->topic_lmq, lm);
continue;
}
uint32_t tpl;
char *tp = (char *) nni_mqtt_msg_get_publish_topic(lm, &tpl);
if (tpl == topic_len && 0 == strncmp(tp, topic, tpl)) {
if ((rv = mqtt_pipe_send_msg(nni_mqtt_msg_get_aio(lm), lm, p, 0)) >= 0) {
nni_aio_finish(nni_mqtt_msg_get_aio(lm), rv, 0);
}
num_ptr --;
continue;
}
nni_lmq_put(s->topic_lmq, lm);
}
nni_id_set(s->topic_map, hash, NULL);
}
}
return 0;
}
Expand Down Expand Up @@ -1756,13 +1785,14 @@ mqtt_quic_ctx_send(void *arg, nni_aio *aio)
log_error("Error in dialer start (sub) %d", rv);
}
} else if (nni_mqtt_msg_get_packet_type(msg) == NNG_MQTT_PUBLISH) {
// check if pub stream already exist
uint32_t topic_len;
char *topic = (char *) nni_mqtt_msg_get_publish_topic(msg, &topic_len);
char * num_ptr;
uint32_t topic_len;
char * topic = (char *) nni_mqtt_msg_get_publish_topic(msg, &topic_len);
mqtt_pipe_t *pub_pipe;
pub_pipe = nni_id_get(s->pub_streams, DJBHashn(topic, topic_len));
uint32_t hash = DJBHashn(topic, topic_len);
pub_pipe = nni_id_get(s->pub_streams, hash);
// check if pub stream already exist
if (pub_pipe == NULL) {
log_info("create new pub stream");
nni_lmq_put(s->topic_lmq, msg);
// check if the stream is already dialing
if (NULL == (num_ptr = nni_id_get(s->topic_map, hash))) {
Expand Down

0 comments on commit b6acb5b

Please sign in to comment.