Skip to content

Commit

Permalink
Merge pull request #310 from matthiasbeyer/fix-clippy
Browse files Browse the repository at this point in the history
Fix clippy
  • Loading branch information
matthiasbeyer authored Sep 9, 2024
2 parents a129968 + 01c2379 commit 73a33ff
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 24 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ categories = ["embedded"]
[workspace]
members = ["cloudmqtt-bin", "mqtt-format"]

[features]
debug = ["winnow/debug"]
#[features]
#debug = ["winnow/debug"]

[dependencies]
futures = "0.3.30"
Expand Down
3 changes: 1 addition & 2 deletions cloudmqtt-bin/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use cloudmqtt::client::connect::MqttClientConnector;
use cloudmqtt::client::send::Publish;
use cloudmqtt::client::MqttClient;
use cloudmqtt::transport::MqttConnectTransport;
use futures::FutureExt;
use tokio::net::TcpStream;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
Expand Down Expand Up @@ -64,7 +63,7 @@ async fn main() {
.await
.unwrap();
let connected = client.connect(connector).await.unwrap();
let background = tokio::task::spawn(connected.background_task);
let _background = tokio::task::spawn(connected.background_task);

client
.publish(Publish {
Expand Down
1 change: 0 additions & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::sync::Arc;

use futures::lock::Mutex;

use self::send::Acknowledge;
use self::send::Callbacks;
use self::send::ClientHandlers;
use self::state::ConnectState;
Expand Down
12 changes: 5 additions & 7 deletions src/client/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async fn handle_pubcomp(
tracing::trace!("Removed packet id from outstanding packets");

if let Some(callback) = inner.outstanding_callbacks.take_qos2_complete(pident) {
if let Err(_) = callback.on_complete.send(packet.clone()) {
if callback.on_complete.send(packet.clone()).is_err() {
tracing::trace!("Could not send ack, receiver was dropped.")
}
} else {
Expand Down Expand Up @@ -178,8 +178,7 @@ async fn handle_puback(
todo!()
};

let pident = PacketIdentifier::try_from(mpuback.packet_identifier)
.expect("Zero PacketIdentifier not valid here");
let pident = PacketIdentifier::from(mpuback.packet_identifier);
tracing::Span::current().record("packet_identifier", tracing::field::display(pident));

if session_state
Expand All @@ -190,7 +189,7 @@ async fn handle_puback(
tracing::trace!("Removed packet id from outstanding packets");

if let Some(callback) = inner.outstanding_callbacks.take_qos1(pident) {
if let Err(_) = callback.on_acknowledge.send(puback.clone()) {
if callback.on_acknowledge.send(puback.clone()).is_err() {
tracing::trace!("Could not send ack, receiver was dropped.")
}
}
Expand Down Expand Up @@ -225,8 +224,7 @@ async fn handle_pubrec(
tracing::error!("No session state found");
todo!()
};
let pident = PacketIdentifier::try_from(pubrec.packet_identifier)
.expect("zero PacketIdentifier not valid here");
let pident = PacketIdentifier::from(pubrec.packet_identifier);
tracing::Span::current().record("packet_identifier", tracing::field::display(pident));

if session_state
Expand Down Expand Up @@ -257,7 +255,7 @@ async fn handle_pubrec(
conn_state.conn_write.send(pubrel).await.map_err(drop)?;

if let Some(callback) = inner.outstanding_callbacks.take_qos2_receive(pident) {
if let Err(_) = callback.on_receive.send(packet.clone()) {
if callback.on_receive.send(packet.clone()).is_err() {
tracing::trace!("Could not send ack, receiver was dropped.")
}
} else {
Expand Down
18 changes: 6 additions & 12 deletions src/client/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use std::collections::HashMap;
use std::collections::VecDeque;

use futures::FutureExt;
use mqtt_format::v5::integers::VARIABLE_INTEGER_MAX;
use mqtt_format::v5::packets::publish::MPublish;
use tracing::Instrument;
Expand Down Expand Up @@ -221,6 +220,7 @@ pub(crate) struct ClientHandlers {
}

pub type OnPacketRecvFn = Box<dyn Fn(crate::packets::MqttPacket) + Send>;
pub type OnPacketRefRecvFn = Box<dyn Fn(&crate::packets::MqttPacket) + Send>;
pub type OnQos1AcknowledgeFn = Box<dyn Fn(crate::packets::Puback) + Send>;

impl Default for ClientHandlers {
Expand Down Expand Up @@ -313,7 +313,7 @@ pub struct Publish {
pub qos: QualityOfService,
pub retain: bool,
pub payload: MqttPayload,
pub on_packet_recv: Option<Box<dyn Fn(&crate::packets::MqttPacket) + Send>>,
pub on_packet_recv: Option<OnPacketRefRecvFn>,
}

pub struct Published {
Expand Down Expand Up @@ -379,14 +379,11 @@ pub struct PublishQos1 {
pub topic: crate::topic::MqttTopic,
pub retain: bool,
pub payload: MqttPayload,
on_packet_recv: Option<Box<dyn Fn(&crate::packets::MqttPacket) + Send>>,
on_packet_recv: Option<OnPacketRefRecvFn>,
}

impl PublishQos1 {
pub fn with_on_packet_recv(
mut self,
on_packet_recv: Box<dyn Fn(&crate::packets::MqttPacket) + Send>,
) -> Self {
pub fn with_on_packet_recv(mut self, on_packet_recv: OnPacketRefRecvFn) -> Self {
self.on_packet_recv = Some(on_packet_recv);
self
}
Expand All @@ -396,14 +393,11 @@ pub struct PublishQos2 {
pub topic: crate::topic::MqttTopic,
pub retain: bool,
pub payload: MqttPayload,
on_packet_recv: Option<Box<dyn Fn(&crate::packets::MqttPacket) + Send>>,
on_packet_recv: Option<OnPacketRefRecvFn>,
}

impl PublishQos2 {
pub fn with_on_packet_recv(
mut self,
on_packet_recv: Box<dyn Fn(&crate::packets::MqttPacket) + Send>,
) -> Self {
pub fn with_on_packet_recv(mut self, on_packet_recv: OnPacketRefRecvFn) -> Self {
self.on_packet_recv = Some(on_packet_recv);
self
}
Expand Down
3 changes: 3 additions & 0 deletions src/client/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl TransportWriter {
}
}

#[allow(unused)]
pub(super) struct ConnectState {
pub(super) session_present: bool,
pub(super) receive_maximum: Option<NonZeroU16>,
Expand All @@ -66,6 +67,7 @@ pub(super) struct ConnectState {
}

pub(super) struct SessionState {
#[allow(unused)]
pub(super) client_identifier: MqttString,
pub(super) outstanding_packets: OutstandingPackets,
}
Expand Down Expand Up @@ -111,6 +113,7 @@ impl OutstandingPackets {
self.outstanding_packets.contains_key(&ident)
}

#[allow(unused)]
pub fn iter_in_send_order(
&self,
) -> impl Iterator<Item = (PacketIdentifier, &crate::packets::MqttPacket)> {
Expand Down

0 comments on commit 73a33ff

Please sign in to comment.