Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix clippy #310

Merged
merged 8 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ categories = ["embedded"]
[workspace]
members = ["cloudmqtt-bin", "mqtt-format"]

[features]
debug = ["winnow/debug"]
#[features]
#debug = ["winnow/debug"]

[dependencies]
futures = "0.3.30"
Expand Down
3 changes: 1 addition & 2 deletions cloudmqtt-bin/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use cloudmqtt::client::connect::MqttClientConnector;
use cloudmqtt::client::send::Publish;
use cloudmqtt::client::MqttClient;
use cloudmqtt::transport::MqttConnectTransport;
use futures::FutureExt;
use tokio::net::TcpStream;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
Expand Down Expand Up @@ -64,7 +63,7 @@ async fn main() {
.await
.unwrap();
let connected = client.connect(connector).await.unwrap();
let background = tokio::task::spawn(connected.background_task);
let _background = tokio::task::spawn(connected.background_task);

client
.publish(Publish {
Expand Down
1 change: 0 additions & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::sync::Arc;

use futures::lock::Mutex;

use self::send::Acknowledge;
use self::send::Callbacks;
use self::send::ClientHandlers;
use self::state::ConnectState;
Expand Down
12 changes: 5 additions & 7 deletions src/client/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async fn handle_pubcomp(
tracing::trace!("Removed packet id from outstanding packets");

if let Some(callback) = inner.outstanding_callbacks.take_qos2_complete(pident) {
if let Err(_) = callback.on_complete.send(packet.clone()) {
if callback.on_complete.send(packet.clone()).is_err() {
tracing::trace!("Could not send ack, receiver was dropped.")
}
} else {
Expand Down Expand Up @@ -178,8 +178,7 @@ async fn handle_puback(
todo!()
};

let pident = PacketIdentifier::try_from(mpuback.packet_identifier)
.expect("Zero PacketIdentifier not valid here");
let pident = PacketIdentifier::from(mpuback.packet_identifier);
tracing::Span::current().record("packet_identifier", tracing::field::display(pident));

if session_state
Expand All @@ -190,7 +189,7 @@ async fn handle_puback(
tracing::trace!("Removed packet id from outstanding packets");

if let Some(callback) = inner.outstanding_callbacks.take_qos1(pident) {
if let Err(_) = callback.on_acknowledge.send(puback.clone()) {
if callback.on_acknowledge.send(puback.clone()).is_err() {
tracing::trace!("Could not send ack, receiver was dropped.")
}
}
Expand Down Expand Up @@ -225,8 +224,7 @@ async fn handle_pubrec(
tracing::error!("No session state found");
todo!()
};
let pident = PacketIdentifier::try_from(pubrec.packet_identifier)
.expect("zero PacketIdentifier not valid here");
let pident = PacketIdentifier::from(pubrec.packet_identifier);
tracing::Span::current().record("packet_identifier", tracing::field::display(pident));

if session_state
Expand Down Expand Up @@ -257,7 +255,7 @@ async fn handle_pubrec(
conn_state.conn_write.send(pubrel).await.map_err(drop)?;

if let Some(callback) = inner.outstanding_callbacks.take_qos2_receive(pident) {
if let Err(_) = callback.on_receive.send(packet.clone()) {
if callback.on_receive.send(packet.clone()).is_err() {
tracing::trace!("Could not send ack, receiver was dropped.")
}
} else {
Expand Down
18 changes: 6 additions & 12 deletions src/client/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use std::collections::HashMap;
use std::collections::VecDeque;

use futures::FutureExt;
use mqtt_format::v5::integers::VARIABLE_INTEGER_MAX;
use mqtt_format::v5::packets::publish::MPublish;
use tracing::Instrument;
Expand Down Expand Up @@ -221,6 +220,7 @@ pub(crate) struct ClientHandlers {
}

pub type OnPacketRecvFn = Box<dyn Fn(crate::packets::MqttPacket) + Send>;
pub type OnPacketRefRecvFn = Box<dyn Fn(&crate::packets::MqttPacket) + Send>;
pub type OnQos1AcknowledgeFn = Box<dyn Fn(crate::packets::Puback) + Send>;

impl Default for ClientHandlers {
Expand Down Expand Up @@ -313,7 +313,7 @@ pub struct Publish {
pub qos: QualityOfService,
pub retain: bool,
pub payload: MqttPayload,
pub on_packet_recv: Option<Box<dyn Fn(&crate::packets::MqttPacket) + Send>>,
pub on_packet_recv: Option<OnPacketRefRecvFn>,
}

pub struct Published {
Expand Down Expand Up @@ -379,14 +379,11 @@ pub struct PublishQos1 {
pub topic: crate::topic::MqttTopic,
pub retain: bool,
pub payload: MqttPayload,
on_packet_recv: Option<Box<dyn Fn(&crate::packets::MqttPacket) + Send>>,
on_packet_recv: Option<OnPacketRefRecvFn>,
}

impl PublishQos1 {
pub fn with_on_packet_recv(
mut self,
on_packet_recv: Box<dyn Fn(&crate::packets::MqttPacket) + Send>,
) -> Self {
pub fn with_on_packet_recv(mut self, on_packet_recv: OnPacketRefRecvFn) -> Self {
self.on_packet_recv = Some(on_packet_recv);
self
}
Expand All @@ -396,14 +393,11 @@ pub struct PublishQos2 {
pub topic: crate::topic::MqttTopic,
pub retain: bool,
pub payload: MqttPayload,
on_packet_recv: Option<Box<dyn Fn(&crate::packets::MqttPacket) + Send>>,
on_packet_recv: Option<OnPacketRefRecvFn>,
}

impl PublishQos2 {
pub fn with_on_packet_recv(
mut self,
on_packet_recv: Box<dyn Fn(&crate::packets::MqttPacket) + Send>,
) -> Self {
pub fn with_on_packet_recv(mut self, on_packet_recv: OnPacketRefRecvFn) -> Self {
self.on_packet_recv = Some(on_packet_recv);
self
}
Expand Down
3 changes: 3 additions & 0 deletions src/client/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl TransportWriter {
}
}

#[allow(unused)]
pub(super) struct ConnectState {
pub(super) session_present: bool,
pub(super) receive_maximum: Option<NonZeroU16>,
Expand All @@ -66,6 +67,7 @@ pub(super) struct ConnectState {
}

pub(super) struct SessionState {
#[allow(unused)]
pub(super) client_identifier: MqttString,
pub(super) outstanding_packets: OutstandingPackets,
}
Expand Down Expand Up @@ -111,6 +113,7 @@ impl OutstandingPackets {
self.outstanding_packets.contains_key(&ident)
}

#[allow(unused)]
pub fn iter_in_send_order(
&self,
) -> impl Iterator<Item = (PacketIdentifier, &crate::packets::MqttPacket)> {
Expand Down