Skip to content

Commit

Permalink
Move packet objects into handlers
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Beyer <[email protected]>
  • Loading branch information
matthiasbeyer committed Apr 5, 2024
1 parent 11e1f61 commit fb6d3f7
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
13 changes: 8 additions & 5 deletions src/client/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ pub(super) async fn handle_background_receiving(
.instrument(process_span)
.await?
}
mqtt_format::v5::packets::MqttPacket::Puback(mpuback) => {
handle_puback(mpuback, &inner, &packet)
mqtt_format::v5::packets::MqttPacket::Puback(_mpuback) => {
handle_puback(&packet.try_into().unwrap(), &inner)
.instrument(process_span)
.await?
}
Expand Down Expand Up @@ -159,10 +159,13 @@ async fn handle_pubcomp(
}

async fn handle_puback(
mpuback: &mqtt_format::v5::packets::puback::MPuback<'_>,
puback: &crate::packets::Puback,
inner: &Arc<Mutex<InnerClient>>,
packet: &MqttPacket,
) -> Result<(), ()> {
tracing::trace!("Calling on_qos1_acknowledge handler");
(inner.lock().await.default_handlers.on_qos1_acknowledge)(puback.clone());
let mpuback = puback.get();

match mpuback.reason {
mqtt_format::v5::packets::puback::PubackReasonCode::Success
| mqtt_format::v5::packets::puback::PubackReasonCode::NoMatchingSubscribers => {
Expand All @@ -185,7 +188,7 @@ async fn handle_puback(
tracing::trace!("Removed packet id from outstanding packets");

if let Some(callback) = inner.outstanding_callbacks.take_qos1(pident) {
if let Err(_) = callback.on_acknowledge.send(packet.clone()) {
if let Err(_) = callback.on_acknowledge.send(puback.clone()) {
tracing::trace!("Could not send ack, receiver was dropped.")
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/client/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ pub struct PacketIdentifierExhausted;
pub(crate) struct ClientHandlers {
pub(crate) on_packet_recv: OnPacketRecvFn,
pub(crate) on_qos1_acknowledge: OnQos1AcknowledgeFn,
// on_qos2_receive: Box<dyn Fn(&crate::packets::MqttPacket) + Send>,
// on_qos2_complete: Box<dyn Fn(&crate::packets::MqttPacket) + Send>,
// on_qos2_receive: Box<dyn Fn(crate::packets::MqttPacket) + Send>,
// on_qos2_complete: Box<dyn Fn(crate::packets::MqttPacket) + Send>,
}

pub type OnPacketRecvFn = Box<dyn Fn(&crate::packets::MqttPacket) + Send>;
pub type OnQos1AcknowledgeFn = Box<dyn Fn(&crate::packets::MqttPacket) + Send>;
pub type OnPacketRecvFn = Box<dyn Fn(crate::packets::MqttPacket) + Send>;
pub type OnQos1AcknowledgeFn = Box<dyn Fn(crate::packets::Puback) + Send>;

impl Default for ClientHandlers {
fn default() -> Self {
Expand Down Expand Up @@ -298,7 +298,7 @@ impl Callbacks {
}

pub(crate) struct Qos1Callbacks {
pub(crate) on_acknowledge: futures::channel::oneshot::Sender<crate::packets::MqttPacket>,
pub(crate) on_acknowledge: futures::channel::oneshot::Sender<crate::packets::Puback>,
}

pub(crate) struct Qos2ReceiveCallback {
Expand Down Expand Up @@ -341,7 +341,7 @@ enum PublishedReceiver {
}

pub struct PublishedQos1 {
recv: futures::channel::oneshot::Receiver<MqttPacket>,
recv: futures::channel::oneshot::Receiver<crate::packets::Puback>,
}

impl PublishedQos1 {
Expand Down

0 comments on commit fb6d3f7

Please sign in to comment.