Skip to content

Commit

Permalink
Merge pull request #217 from ivmarkov/psub
Browse files Browse the repository at this point in the history
Failure to process a subscription should not unwind the whole stack
  • Loading branch information
kedars authored Jan 16, 2025
2 parents 2921a36 + d843f10 commit a2a46b0
Showing 1 changed file with 57 additions and 31 deletions.
88 changes: 57 additions & 31 deletions rs-matter/src/data_model/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,41 +521,26 @@ where
.unwrap();
let rx = self.subscriptions_buffers.borrow_mut().remove(index).buffer;

let mut exchange = if let Some(session_id) = session_id {
Exchange::initiate_for_session(matter, session_id)?
} else {
// Commented out as we have issues on HomeKit with that:
// https://github.com/ivmarkov/esp-idf-matter/issues/3
// Exchange::initiate(matter, fabric_idx, peer_node_id, true).await?
Err(ErrorCode::NoSession)?
};

if let Some(mut tx) = self.buffers.get().await {
let primed = self
.report_data(
id,
fabric_idx.get(),
peer_node_id,
&rx,
&mut tx,
&mut exchange,
false,
)
.await?;

exchange.acknowledge().await?;

if primed && self.subscriptions.mark_reported(id) {
let _ =
self.subscriptions_buffers
.borrow_mut()
.push(SubscriptionBuffer {
let result = self
.process_subscription(matter, fabric_idx, peer_node_id, session_id, id, &rx)
.await;

match result {
Ok(primed) => {
if primed && self.subscriptions.mark_reported(id) {
let _ = self.subscriptions_buffers.borrow_mut().push(
SubscriptionBuffer {
fabric_idx,
peer_node_id,
subscription_id: id,
buffer: rx,
});
subscribed.set(true);
},
);
subscribed.set(true);
}
}
Err(e) => {
error!("Error while processing subscription: {:?}", e);
}
}
} else {
Expand All @@ -565,6 +550,47 @@ where
}
}

async fn process_subscription(
&self,
matter: &Matter<'_>,
fabric_idx: NonZeroU8,
peer_node_id: u64,
session_id: Option<u32>,
id: u32,
rx: &[u8],
) -> Result<bool, Error> {
let mut exchange = if let Some(session_id) = session_id {
Exchange::initiate_for_session(matter, session_id)?
} else {
// Commented out as we have issues on HomeKit with that:
// https://github.com/ivmarkov/esp-idf-matter/issues/3
// Exchange::initiate(matter, fabric_idx, peer_node_id, true).await?
Err(ErrorCode::NoSession)?
};

if let Some(mut tx) = self.buffers.get().await {
let primed = self
.report_data(
id,
fabric_idx.get(),
peer_node_id,
rx,
&mut tx,
&mut exchange,
false,
)
.await?;

exchange.acknowledge().await?;

Ok(primed)
} else {
error!("No TX buffer available for processing subscription [F:{fabric_idx:x},P:{peer_node_id:x}]::{id}");

Ok(false)
}
}

async fn timed(&self, exchange: &mut Exchange<'_>) -> Result<Duration, Error> {
let req = TimedReq::from_tlv(&get_root_node_struct(exchange.rx()?.payload())?)?;
debug!("IM: Timed request: {:?}", req);
Expand Down

0 comments on commit a2a46b0

Please sign in to comment.