Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add v5 writing #234

Merged
merged 27 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
60e6856
Add WriteMqttPacket trait abstraction
matthiasbeyer Mar 21, 2024
6cf960f
Implement MFixedHeader::write()
matthiasbeyer Mar 21, 2024
049b644
Implement write() fn for all combined reason code types
matthiasbeyer Mar 21, 2024
94fe602
Implement string writing
matthiasbeyer Mar 21, 2024
5f30efa
Implement PacketIdentifier::write()
matthiasbeyer Mar 21, 2024
201423b
Implement write_variable_u32
matthiasbeyer Mar 21, 2024
1b0a75a
Add trace in case of unknown property id
matthiasbeyer Mar 21, 2024
10c0284
Fix: Take whole slice for properties
matthiasbeyer Mar 21, 2024
ab2e115
Implement writing of properties
matthiasbeyer Mar 21, 2024
b95c236
Add fns to get binary size of objects
matthiasbeyer Mar 21, 2024
31cf1ca
Implement write() for all generated property types
matthiasbeyer Mar 21, 2024
a8e1ff0
Implement MqttPacket::write()
matthiasbeyer Mar 21, 2024
7cf0ec9
Implement MUnsubscribe::write
TheNeikos Mar 21, 2024
ca8de0a
Implement MUnsuback::write
TheNeikos Mar 21, 2024
2f59046
Implement MSubscribe::write
TheNeikos Mar 21, 2024
9fb6774
Implement MSuback::write
TheNeikos Mar 21, 2024
f9fff59
Implement MPubrel::write
TheNeikos Mar 21, 2024
fd6b474
Implement MPubrec::write
TheNeikos Mar 21, 2024
2986d1c
Implement MPublish::write
TheNeikos Mar 21, 2024
804cb41
Implement MPubcomp::write
TheNeikos Mar 21, 2024
bed0308
Implement MAuth::write
matthiasbeyer Mar 21, 2024
526c51e
Implement MConnack::write
matthiasbeyer Mar 21, 2024
961aea2
Implement MConnect::write
matthiasbeyer Mar 21, 2024
e64b448
Implement MDisconnect::write
matthiasbeyer Mar 21, 2024
f09b85d
Implement MPingreq::write
matthiasbeyer Mar 21, 2024
33ad5d4
Implement MPingresp::write
matthiasbeyer Mar 21, 2024
8123c25
Implement MPuback::write
matthiasbeyer Mar 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions mqtt-format/src/v5/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use winnow::binary::length_take;
use winnow::Bytes;
use winnow::Parser;

use super::write::WResult;
use super::write::WriteMqttPacket;
use super::MResult;

pub fn parse_binary_data<'i>(input: &mut &'i Bytes) -> MResult<&'i [u8]> {
Expand All @@ -18,11 +20,28 @@ pub fn parse_binary_data<'i>(input: &mut &'i Bytes) -> MResult<&'i [u8]> {
.parse_next(input)
}

pub async fn write_binary_data<W: WriteMqttPacket>(buffer: &mut W, slice: &[u8]) -> WResult<W> {
let slice_len = slice
.len()
.try_into()
.map_err(|_| W::Error::from(super::write::MqttWriteError::Invariant))?;

buffer.write_u16(slice_len).await?;
buffer.write_slice(slice).await
}

#[inline]
pub fn binary_data_binary_size(data: &[u8]) -> u32 {
(2 + data.len()) as u32
}

#[cfg(test)]
mod tests {
use winnow::Bytes;

use crate::v5::bytes::parse_binary_data;
use crate::v5::bytes::write_binary_data;
use crate::v5::test::TestWriter;

#[test]
fn check_binary_data() {
Expand All @@ -33,4 +52,15 @@ mod tests {
&[0x4, 0x2]
);
}

#[tokio::test]
async fn test_write_binary_data() {
let mut writer = TestWriter { buffer: Vec::new() };
let data = &[0xFF, 0xAB, 0x42, 0x13, 0x37, 0x69];

write_binary_data(&mut writer, data).await.unwrap();
let out = parse_binary_data(&mut Bytes::new(&writer.buffer)).unwrap();

assert_eq!(out, data);
}
}
36 changes: 36 additions & 0 deletions mqtt-format/src/v5/fixed_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use winnow::error::ParserError;
use winnow::Bytes;
use winnow::Parser;

use super::write::WResult;
use super::write::WriteMqttPacket;
use super::MResult;

#[derive(num_enum::TryFromPrimitive, num_enum::IntoPrimitive)]
Expand Down Expand Up @@ -101,6 +103,40 @@ impl MFixedHeader {

Ok(MFixedHeader { packet_type })
}

pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
#[allow(clippy::identity_op)]
let byte = match self.packet_type {
PacketType::Connect => (1 << 4) | 0,
PacketType::Connack => (2 << 4) | 0,
PacketType::Publish { dup, qos, retain } => {
let upper = 3 << 4;
let lower = {
let dup = (dup as u8) << 3;
let qos = (qos as u8) << 1;
let retain = retain as u8;

dup | qos | retain
};

upper | lower
}
PacketType::Puback => (4 << 4) | 0,
PacketType::Pubrec => (5 << 4) | 0,
PacketType::Pubrel => (6 << 4) | 0b0010,
PacketType::Pubcomp => (7 << 4) | 0,
PacketType::Subscribe => (8 << 4) | 0b0010,
PacketType::Suback => (9 << 4) | 0,
PacketType::Unsubscribe => (10 << 4) | 0b0010,
PacketType::Unsuback => (11 << 4) | 0,
PacketType::Pingreq => (12 << 4) | 0,
PacketType::Pingresp => (13 << 4) | 0,
PacketType::Disconnect => (14 << 4) | 0,
PacketType::Auth => (15 << 4) | 0,
};

buffer.write_byte(byte).await
}
}

#[cfg(test)]
Expand Down
104 changes: 104 additions & 0 deletions mqtt-format/src/v5/integers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use winnow::token::take_while;
use winnow::Bytes;
use winnow::Parser;

use super::write::WResult;
use super::write::WriteMqttPacket;
use super::MResult;

/// Parse a u16
Expand All @@ -26,6 +28,11 @@ pub fn parse_u16(input: &mut &Bytes) -> MResult<u16> {
.parse_next(input)
}

pub async fn write_u16<W: WriteMqttPacket>(buffer: &mut W, u: u16) -> WResult<W> {
buffer.write_u16(u.to_be()).await?;
Ok(())
}

/// Parse a u32
///
/// MQTT expects their numbers in big-endian
Expand All @@ -38,6 +45,11 @@ pub fn parse_u32(input: &mut &Bytes) -> MResult<u32> {
.parse_next(input)
}

pub async fn write_u32<W: WriteMqttPacket>(buffer: &mut W, u: u32) -> WResult<W> {
buffer.write_u32(u.to_be()).await?;
Ok(())
}

/// Parse a variable sized integer
///
/// Value range: `0..268_435_455`
Expand All @@ -63,13 +75,65 @@ pub fn parse_variable_u32(input: &mut &Bytes) -> MResult<u32> {
.parse_next(input)
}

#[inline]
pub const fn variable_u32_binary_size(u: u32) -> u32 {
match u {
0..=127 => 1,
128..=16383 => 2,
16384..=2_097_151 => 3,
2_097_152..=268_435_455 => 4,
_size => unreachable!(),
}
}

pub async fn write_variable_u32<W: WriteMqttPacket>(buffer: &mut W, u: u32) -> WResult<W> {
match u {
0..=127 => {
buffer.write_byte(u as u8).await?;
}
len @ 128..=16383 => {
let first = (len % 128) | 0b1000_0000;
let second = len / 128;
buffer.write_byte(first as u8).await?;
buffer.write_byte(second as u8).await?;
}
len @ 16384..=2_097_151 => {
let first = (len % 128) | 0b1000_0000;
let second = ((len / 128) % 128) | 0b1000_0000;
let third = len / (128 * 128);

buffer.write_byte(first as u8).await?;
buffer.write_byte(second as u8).await?;
buffer.write_byte(third as u8).await?;
}
len @ 2_097_152..=268_435_455 => {
let first = (len % 128) | 0b1000_0000;
let second = ((len / 128) % 128) | 0b1000_0000;
let third = ((len / (128 * 128)) % 128) | 0b1000_0000;
let fourth = len / (128 * 128 * 128);

buffer.write_byte(first as u8).await?;
buffer.write_byte(second as u8).await?;
buffer.write_byte(third as u8).await?;
buffer.write_byte(fourth as u8).await?;
}
_size => {
return Err(super::write::MqttWriteError::Invariant.into());
}
}
Ok(())
}

#[cfg(test)]
mod tests {
use winnow::Bytes;

use crate::v5::integers::parse_u16;
use crate::v5::integers::parse_u32;
use crate::v5::integers::parse_variable_u32;
use crate::v5::integers::write_variable_u32;
use crate::v5::test::TestWriter;
use crate::v5::write::WriteMqttPacket;

#[test]
fn check_integer_parsing() {
Expand Down Expand Up @@ -118,4 +182,44 @@ mod tests {
let input = [0xFF, 0xFF, 0xFF, 0x8F];
parse_variable_u32(&mut Bytes::new(&input)).unwrap_err();
}

#[tokio::test]
async fn test_write_byte() {
let mut writer = TestWriter { buffer: Vec::new() };

writer.write_byte(1).await.unwrap();

assert_eq!(writer.buffer, &[1]);
}

#[tokio::test]
async fn test_write_two_bytes() {
let mut writer = TestWriter { buffer: Vec::new() };

writer.write_u16(1).await.unwrap();

assert_eq!(writer.buffer, &[0x00, 0x01]);
}

#[tokio::test]
async fn test_write_four_bytes() {
let mut writer = TestWriter { buffer: Vec::new() };

writer.write_u32(1).await.unwrap();

assert_eq!(writer.buffer, &[0x00, 0x00, 0x00, 0x01]);
}

#[tokio::test]
async fn test_write_variable_u32() {
// step by some prime number
for i in (0..268_435_455).step_by(271) {
let mut writer = TestWriter { buffer: Vec::new() };

write_variable_u32(&mut writer, i).await.unwrap();

let out = parse_variable_u32(&mut Bytes::new(&writer.buffer)).unwrap();
assert_eq!(out, i);
}
}
}
4 changes: 4 additions & 0 deletions mqtt-format/src/v5/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,9 @@ pub mod reason_code;
pub mod strings;
mod util;
pub mod variable_header;
pub mod write;

#[cfg(test)]
pub mod test;

pub type MResult<O> = winnow::PResult<O>;
7 changes: 7 additions & 0 deletions mqtt-format/src/v5/packets/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use crate::v5::variable_header::AuthenticationData;
use crate::v5::variable_header::AuthenticationMethod;
use crate::v5::variable_header::ReasonString;
use crate::v5::variable_header::UserProperties;
use crate::v5::write::WResult;
use crate::v5::write::WriteMqttPacket;
use crate::v5::MResult;

crate::v5::reason_code::make_combined_reason_code! {
Expand Down Expand Up @@ -56,4 +58,9 @@ impl<'i> MAuth<'i> {
})
.parse_next(input)
}

pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
self.reason.write(buffer).await?;
self.properties.write(buffer).await
}
}
9 changes: 9 additions & 0 deletions mqtt-format/src/v5/packets/connack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use crate::v5::variable_header::SubscriptionIdentifiersAvailable;
use crate::v5::variable_header::TopicAliasMaximum;
use crate::v5::variable_header::UserProperties;
use crate::v5::variable_header::WildcardSubscriptionAvailable;
use crate::v5::write::WResult;
use crate::v5::write::WriteMqttPacket;
use crate::v5::MResult;

crate::v5::reason_code::make_combined_reason_code! {
Expand Down Expand Up @@ -145,4 +147,11 @@ impl<'i> MConnack<'i> {
})
.parse_next(input)
}

pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
let byte = (self.session_present as u8) << 7;
buffer.write_byte(byte).await?;
self.reason_code.write(buffer).await?;
self.properties.write(buffer).await
}
}
61 changes: 61 additions & 0 deletions mqtt-format/src/v5/packets/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use crate::v5::variable_header::SessionExpiryInterval;
use crate::v5::variable_header::TopicAliasMaximum;
use crate::v5::variable_header::UserProperties;
use crate::v5::variable_header::WillDelayInterval;
use crate::v5::write::WResult;
use crate::v5::write::WriteMqttPacket;
use crate::v5::MResult;

#[derive(Debug)]
Expand Down Expand Up @@ -161,6 +163,49 @@ impl<'i> MConnect<'i> {
})
.parse_next(input)
}

pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
crate::v5::strings::write_string(buffer, "MQTT").await?;
ProtocolLevel::V5.write(buffer).await?;

let flags = {
let reserved = 0;
let clean_start = (self.clean_start as u8) << 1;
let will = {
if let Some(will) = self.will.as_ref() {
let will_flag = 1 << 2;
let will_qos = {
let qos: u8 = will.will_qos.into();
qos << 3
};
let will_retain = (will.will_retain as u8) << 5;
will_flag | will_qos | will_retain
} else {
0
}
};
let password = (self.password.is_some() as u8) << 6;
let username = (self.username.is_some() as u8) << 7;

reserved | clean_start | will | password | username
};

buffer.write_byte(flags).await?;
buffer.write_u16(self.keep_alive).await?;
self.properties.write(buffer).await?;
crate::v5::strings::write_string(buffer, self.client_identifier).await?;
if let Some(will) = self.will.as_ref() {
will.write(buffer).await?;
}
if let Some(username) = self.username.as_ref() {
crate::v5::strings::write_string(buffer, username).await?;
}
if let Some(password) = self.password.as_ref() {
crate::v5::bytes::write_binary_data(buffer, password).await?;
}

Ok(())
}
}

#[derive(Debug)]
Expand All @@ -172,6 +217,15 @@ pub struct Will<'i> {
pub will_retain: bool,
}

impl<'i> Will<'i> {
pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
self.properties.write(buffer).await?;
crate::v5::strings::write_string(buffer, self.topic).await?;
crate::v5::bytes::write_binary_data(buffer, self.payload).await?;
Ok(())
}
}

crate::v5::properties::define_properties! {
pub struct ConnectWillProperties<'i> {
will_delay_interval: WillDelayInterval,
Expand Down Expand Up @@ -200,4 +254,11 @@ impl ProtocolLevel {
)),
}
}

pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
match self {
ProtocolLevel::V3 => buffer.write_byte(3).await,
ProtocolLevel::V5 => buffer.write_byte(5).await,
}
}
}
Loading
Loading