Skip to content

Commit

Permalink
mqtt: do not own settings
Browse files Browse the repository at this point in the history
in support and exploration of
quartiq/stabilizer#862
  • Loading branch information
jordens committed Apr 3, 2024
1 parent 18e56cc commit 9627c6b
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Python lib signatures have changed (Miniconf.create(), discover())
* Python lib discovery timeout has been optimized to work well for both slow
connections (high RTT) and fast ones
* The MQTT client does not own the minitonf settings struct anymore.

### Added

Expand Down
8 changes: 4 additions & 4 deletions examples/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,17 @@ async fn main() {
Stack,
"sample/prefix",
StandardClock::default(),
Settings::default(),
minimq::ConfigBuilder::new(localhost.into(), &mut buffer).keepalive_interval(60),
)
.unwrap();

let mut settings = Settings::default();
loop {
if client.update().unwrap() {
println!("Settings updated: {:?}", client.settings());
if client.update(&mut settings).unwrap() {
println!("Settings updated: {:?}", settings);
}

if client.settings().exit {
if settings.exit {
break;
}

Expand Down
64 changes: 30 additions & 34 deletions src/mqtt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,30 +151,29 @@ struct ListCache {
/// std_embedded_nal::Stack::default(),
/// "quartiq/application/12345", // prefix
/// std_embedded_time::StandardClock::default(),
/// Settings::default(),
/// minimq::ConfigBuilder::new(localhost.into(), &mut buffer).keepalive_interval(60),
/// )
/// .unwrap();
/// let mut settings = Settings::default();
///
/// client
/// .handled_update(|path, old_settings, new_settings| {
/// .handled_update(&mut settings, |path, settings, new_settings| {
/// if new_settings.foo {
/// return Err("Foo!");
/// }
/// *old_settings = new_settings.clone();
/// *settings = new_settings;
/// Ok(())
/// })
/// .unwrap();
/// ```
pub struct MqttClient<'buf, Settings, Stack, Clock, Broker, const Y: usize>
where
Settings: TreeKey<Y> + Clone,
Settings: TreeKey<Y>,
Stack: TcpClientStack,
Clock: embedded_time::Clock,
Broker: minimq::Broker,
{
mqtt: minimq::Minimq<'buf, Stack, Clock, Broker>,
settings: Settings,
state: sm::StateMachine<sm::Context<Clock, Settings, Y>>,
prefix: String<MAX_TOPIC_LENGTH>,
listing_state: Option<(ListCache, Iter<Settings, Y>)>,
Expand All @@ -194,13 +193,11 @@ where
/// * `stack` - The network stack to use for communication.
/// * `prefix` - The MQTT device prefix to use for this device.
/// * `clock` - The clock for managing the MQTT connection.
/// * `settings` - The initial settings values.
/// * `config` - The configuration of the MQTT client.
pub fn new(
stack: Stack,
prefix: &str,
clock: Clock,
settings: Settings,
config: minimq::ConfigBuilder<'buf, Broker>,
) -> Result<Self, minimq::ProtocolError> {
// Configure a will so that we can indicate whether or not we are connected.
Expand All @@ -221,7 +218,6 @@ where
Ok(Self {
mqtt,
state: sm::StateMachine::new(sm::Context::new(clock)),
settings,
prefix,
listing_state: None,
})
Expand Down Expand Up @@ -273,7 +269,7 @@ where
}
}

fn handle_republish(&mut self) {
fn handle_republish(&mut self, settings: &Settings) {
while self.mqtt.client().can_publish(QoS::AtMostOnce) {
let Some(topic) = self.state.context_mut().republish_state.next() else {
// If we got here, we completed iterating over the topics and published them all.
Expand All @@ -295,7 +291,7 @@ where
// If the topic is not present, we'll fail to serialize the setting into the
// payload and will never publish. The iterator has already incremented, so this is
// acceptable.
DeferredPublication::new(|buf| self.settings.get_json(&topic, buf))
DeferredPublication::new(|buf| settings.get_json(&topic, buf))
.topic(&prefixed_topic)
.finish()
.unwrap(),
Expand Down Expand Up @@ -375,9 +371,13 @@ where
///
/// # Returns
/// True if the settings changed. False otherwise.
pub fn handled_update<F, E>(&mut self, handler: F) -> Result<bool, minimq::Error<Stack::Error>>
pub fn handled_update<F, E>(
&mut self,
settings: &mut Settings,
handler: F,
) -> Result<bool, minimq::Error<Stack::Error>>
where
F: FnMut(&str, &mut Settings, &Settings) -> Result<(), E>,
F: FnMut(&str, &mut Settings, Settings) -> Result<(), E>,
E: core::fmt::Display,
{
if !self.mqtt.client().is_connected() {
Expand All @@ -400,7 +400,7 @@ where
.unwrap();
}
}
sm::States::RepublishingSettings => self.handle_republish(),
sm::States::RepublishingSettings => self.handle_republish(settings),

// Nothing to do in the active state.
sm::States::Active => {}
Expand All @@ -409,19 +409,20 @@ where
self.handle_listing();

// All states must handle MQTT traffic.
self.handle_mqtt_traffic(handler)
self.handle_mqtt_traffic(settings, handler)
}

fn handle_mqtt_traffic<F, E>(
&mut self,
settings: &mut Settings,
mut handler: F,
) -> Result<bool, minimq::Error<Stack::Error>>
where
F: FnMut(&str, &mut Settings, &Settings) -> Result<(), E>,
F: FnMut(&str, &mut Settings, Settings) -> Result<(), E>,
E: core::fmt::Display,
{
let mut updated = false;
match self.mqtt.poll(|client, topic, message, properties| {
let poll = self.mqtt.poll(|client, topic, message, properties| {
let Some(path) = topic.strip_prefix(self.prefix.as_str()) else {
log::info!("Unexpected topic prefix: {topic}");
return;
Expand Down Expand Up @@ -474,13 +475,12 @@ where

Command::Get { path } => {
let props = [ResponseCode::Ok.as_user_property()];
let Ok(message) =
DeferredPublication::new(|buf| self.settings.get_json(path, buf))
.properties(&props)
.reply(properties)
// Override the response topic with the path.
.qos(QoS::AtLeastOnce)
.finish()
let Ok(message) = DeferredPublication::new(|buf| settings.get_json(path, buf))
.properties(&props)
.reply(properties)
// Override the response topic with the path.
.qos(QoS::AtLeastOnce)
.finish()
else {
// If we can't create the publication, it's because there's no way to reply
// to the message. Since we don't know where to send things, abort now and
Expand All @@ -506,7 +506,7 @@ where
}

Command::Set { path, value } => {
let mut new_settings = self.settings.clone();
let mut new_settings = settings.clone();
if let Err(err) = new_settings.set_json(path, value) {
if let Ok(response) = DeferredPublication::new(|mut buf| {
let start = buf.len();
Expand All @@ -524,7 +524,7 @@ where

updated = true;

match handler(path, &mut self.settings, &new_settings) {
match handler(path, settings, new_settings) {
Ok(_) => {
if let Ok(response) = Publication::new("OK".as_bytes())
.properties(&[ResponseCode::Ok.as_user_property()])
Expand All @@ -551,7 +551,8 @@ where
}
}
}
}) {
});
match poll {
Ok(_) => Ok(updated),
Err(minimq::Error::SessionReset) => {
log::warn!("Session reset");
Expand All @@ -566,18 +567,13 @@ where
///
/// # Returns
/// True if the settings changed. False otherwise
pub fn update(&mut self) -> Result<bool, minimq::Error<Stack::Error>> {
self.handled_update(|_, old, new| {
*old = new.clone();
pub fn update(&mut self, settings: &mut Settings) -> Result<bool, minimq::Error<Stack::Error>> {
self.handled_update(settings, |_path, settings, new_settings| {
*settings = new_settings;
Result::<_, &'static str>::Ok(())
})
}

/// Get the current settings from miniconf.
pub fn settings(&self) -> &Settings {
&self.settings
}

/// Force republication of the current settings.
///
/// # Note
Expand Down
12 changes: 6 additions & 6 deletions tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ fn main() -> std::io::Result<()> {
minimq::ConfigBuilder::new(localhost.into(), &mut buffer).keepalive_interval(60),
);

// Construct a settings configuration interface.
let mut buffer = [0u8; 1024];
let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap();

Expand All @@ -107,7 +106,6 @@ fn main() -> std::io::Result<()> {
Stack,
"device",
StandardClock::default(),
Settings::default(),
minimq::ConfigBuilder::new(localhost.into(), &mut buffer).keepalive_interval(60),
)
.unwrap();
Expand All @@ -116,6 +114,8 @@ fn main() -> std::io::Result<()> {
let mut state = TestState::started();
let mut timer = Timer::new(std::time::Duration::from_millis(100));

let mut settings = Settings::default();

loop {
// First, update our client's MQTT state.
mqtt.poll(|_, _, msg, _| {
Expand All @@ -125,7 +125,7 @@ fn main() -> std::io::Result<()> {
.unwrap();

// Next, service the settings interface and progress the test.
let setting_update = interface.update().unwrap();
let setting_update = interface.update(&mut settings).unwrap();
match state {
TestState::Started(_) => {
if timer.is_complete() && mqtt.client().is_connected() {
Expand Down Expand Up @@ -174,9 +174,9 @@ fn main() -> std::io::Result<()> {
}
TestState::Complete(_) => {
// Verify the settings all have the correct value.
info!("Verifying settings: {:?}", interface.settings());
assert!(interface.settings().data == 500);
assert!(interface.settings().more.inner == 100);
info!("Verifying settings: {:?}", settings);
assert!(settings.data == 500);
assert!(settings.more.inner == 100);
std::process::exit(0);
}

Expand Down
5 changes: 3 additions & 2 deletions tests/republish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,17 @@ async fn main() {
Stack,
"republish/device",
StandardClock::default(),
Settings::default(),
minimq::ConfigBuilder::new(localhost.into(), &mut buffer).keepalive_interval(60),
)
.unwrap();

let mut settings = Settings::default();

// Poll the client for 5 seconds. This should be enough time for the miniconf client to publish
// all settings values.
for _ in 0..500 {
// The interface should never indicate a settings update during the republish process.
assert!(!interface.update().unwrap());
assert!(!interface.update(&mut settings).unwrap());
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}

Expand Down
7 changes: 4 additions & 3 deletions tests/validation_failure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,17 @@ async fn main() {
Stack,
"validation_failure/device",
StandardClock::default(),
Settings::default(),
minimq::ConfigBuilder::new(localhost.into(), &mut buffer).keepalive_interval(60),
)
.unwrap();

let mut settings = Settings::default();

// Update the client until the exit
let mut should_exit = false;
loop {
interface
.handled_update(|_path, _old_settings, new_settings| {
.handled_update(&mut settings, |_path, _settings, new_settings| {
log::info!("Handling setting update");
if new_settings.error {
should_exit = true;
Expand All @@ -116,5 +117,5 @@ async fn main() {
}

// Check that the error setting did not stick.
assert!(!interface.settings().error);
assert!(!settings.error);
}

0 comments on commit 9627c6b

Please sign in to comment.