Skip to content

Commit

Permalink
* SYNC [broker tls/tcp] sync broker_tcp/tls pro_ver
Browse files Browse the repository at this point in the history
Signed-off-by: wayne <[email protected]>
  • Loading branch information
StargazerWayne committed Dec 12, 2023
1 parent 6e1488b commit 97df8c0
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 20 deletions.
6 changes: 4 additions & 2 deletions src/sp/transport/mqtt/broker_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,11 @@ tcptran_pipe_nego_cb(void *arg)
conn_param_clone(p->tcp_cparam);

// Connection is accepted.
if (p->tcp_cparam->pro_ver == 5) {

p->pro_ver = p->tcp_cparam->pro_ver;
if (p->pro_ver == 5) {
p->qsend_quota = p->tcp_cparam->rx_max;
}
p->pro_ver = p->tcp_cparam->pro_ver;
nni_list_remove(&ep->negopipes, p);
nni_list_append(&ep->waitpipes, p);
tcptran_ep_match(ep);
Expand Down Expand Up @@ -1733,6 +1734,7 @@ tcptran_accept_cb(void *arg)
nni_mtx_lock(&ep->mtx);

if ((rv = nni_aio_result(aio)) != 0) {
log_warn(" send aio error %s", nng_strerror(rv));
goto error;
}

Expand Down
43 changes: 25 additions & 18 deletions src/sp/transport/mqtts/broker_tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ struct tlstran_pipe {
bool busy; // indicator for qos ack & aio
uint8_t txlen[NANO_MIN_PACKET_LEN];
uint8_t rxlen[NNI_NANO_MAX_HEADER_SIZE];
uint8_t pro_ver;
uint8_t *conn_buf;
uint8_t *qos_buf; // msg trunk for qos & V4/V5 conversion
nni_aio *txaio;
Expand Down Expand Up @@ -364,7 +365,9 @@ tlstran_pipe_nego_cb(void *arg)
// connection packet handled successfully. clone it for protocol or app layer
conn_param_clone(p->tcp_cparam);
// Connection is accepted.
if (p->tcp_cparam->pro_ver == 5) {

p->pro_ver = p->tcp_cparam->pro_ver;
if (p->pro_ver == 5) {
p->qsend_quota = p->tcp_cparam->rx_max;
}
nni_list_remove(&ep->negopipes, p);
Expand Down Expand Up @@ -412,7 +415,10 @@ tlstran_pipe_nego_cb(void *arg)
// error code. This is necessary to avoid a problem where the
// closed status is confused with the accept file descriptor
// being closed.
conn_param_free(p->tcp_cparam);
if (p->tcp_cparam) {
conn_param_free(p->tcp_cparam);
p->tcp_cparam = NULL;
}
if (rv == NNG_ECLOSED) {
rv = NNG_ECONNSHUT;
}
Expand Down Expand Up @@ -451,7 +457,7 @@ tlstran_pipe_qos_send_cb(void *arg)
nni_mtx_unlock(&p->mtx);
return;
}
msg = nni_aio_get_msg(p->qsaio);
msg = nni_aio_get_msg(qsaio);
if (msg != NULL)
type = nni_msg_cmd_type(msg);
else {
Expand All @@ -460,7 +466,7 @@ tlstran_pipe_qos_send_cb(void *arg)
return;
}

if (p->tcp_cparam->pro_ver == 5) {
if (p->pro_ver == 5) {
(type == CMD_PUBCOMP || type == PUBACK) ? p->qrecv_quota++
: p->qrecv_quota;
}
Expand Down Expand Up @@ -534,11 +540,11 @@ tlstran_pipe_send_cb(void *arg)

if (nni_aio_get_prov_data(txaio) != NULL) {
// msgs left behind due to multiple topics matched
if (p->tcp_cparam->pro_ver == 4)
if (p->pro_ver == 4)
tlstran_pipe_send_start_v4(p, msg, txaio);
else if (p->tcp_cparam->pro_ver == 5)
else if (p->pro_ver == 5)
tlstran_pipe_send_start_v5(p, msg, txaio);
else if (p->tcp_cparam->pro_ver == 3)
else if (p->pro_ver == 3)
tlstran_pipe_send_start_v4(p, msg, txaio);
else {
log_error("pro_ver of the msg is not 3, 4 or 5.");
Expand Down Expand Up @@ -739,8 +745,7 @@ tlstran_pipe_recv_cb(void *arg)
if (qos_pac > 0) {
// flow control, check rx_max
// recv_quota as length of lmq
// TODO add pro_ver in tlstran_pipe
if (p->tcp_cparam->pro_ver == 5) {
if (p->pro_ver == 5) {
if (p->qrecv_quota > 0) {
p->qrecv_quota--;
} else {
Expand All @@ -766,29 +771,29 @@ tlstran_pipe_recv_cb(void *arg)
}
} else if (type == CMD_PUBREC) {
if ((rv = nni_mqtt_pubres_decode(msg, &packet_id, &reason_code, &prop,
cparam->pro_ver)) != 0) {
p->pro_ver)) != 0) {
log_error("decode PUBREC variable header failed!");
goto recv_error;
}
ack_cmd = CMD_PUBREL;
ack = true;
} else if (type == CMD_PUBREL) {
if ((rv = nni_mqtt_pubres_decode(msg, &packet_id, &reason_code, &prop,
cparam->pro_ver)) != 0) {
p->pro_ver)) != 0) {
log_error("decode PUBREL variable header failed!");
goto recv_error;
}
ack_cmd = CMD_PUBCOMP;
ack = true;
} else if (type == CMD_PUBACK || type == CMD_PUBCOMP) {
if ((rv = nni_mqtt_pubres_decode(msg, &packet_id, &reason_code, &prop,
cparam->pro_ver)) != 0) {
p->pro_ver)) != 0) {
log_error("decode PUBACK or PUBCOMP variable header "
"failed!");
goto recv_error;
}
// MQTT V5 flow control
if (cparam->pro_ver == 5) {
if (p->pro_ver == 5) {
log_debug("free property & reduce send quota");
property_free(prop);
p->qsend_quota++;
Expand All @@ -806,7 +811,7 @@ tlstran_pipe_recv_cb(void *arg)

nni_msg_set_cmd_type(qmsg, ack_cmd);
nni_mqtt_msgack_encode(
qmsg, packet_id, reason_code, prop, cparam->pro_ver);
qmsg, packet_id, reason_code, prop, p->pro_ver);
property_free(prop);
nni_mqtt_pubres_header_encode(qmsg, ack_cmd);
// if (prop != NULL) {
Expand Down Expand Up @@ -1470,12 +1475,14 @@ tlstran_pipe_send_start(tlstran_pipe *p)
nni_aio_finish(aio, NNG_ECANCELED, 0);
return;
}
if (p->tcp_cparam->pro_ver == 4) {
if (p->pro_ver == 4) {
tlstran_pipe_send_start_v4(p, msg, aio);
} else if (p->tcp_cparam->pro_ver == 5) {
} else if (p->pro_ver == 5) {
tlstran_pipe_send_start_v5(p, msg, aio);
} else if (p->tcp_cparam->pro_ver == 3) {
} else if (p->pro_ver == 3) {
tlstran_pipe_send_start_v4(p, msg, aio);
} else {
nni_aio_finish_error(aio, NNG_EPROTO);
}
return;
}
Expand Down Expand Up @@ -1940,7 +1947,7 @@ tlstran_ep_set_conf(void *arg, const void *v, size_t sz, nni_opt_type t)
tlstran_ep *ep = arg;
NNI_ARG_UNUSED(sz);
NNI_ARG_UNUSED(t);

nni_mtx_lock(&ep->mtx);
ep->conf = (conf *) v;
nni_mtx_unlock(&ep->mtx);
Expand Down

0 comments on commit 97df8c0

Please sign in to comment.