From d843f10a128b1203c57e8fda786620d79af7d8fe Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Fri, 29 Nov 2024 20:14:01 +0000 Subject: [PATCH] Make sure a failure to process a subscription does not break the whole stack --- rs-matter/src/data_model/core.rs | 88 +++++++++++++++++++++----------- 1 file changed, 57 insertions(+), 31 deletions(-) diff --git a/rs-matter/src/data_model/core.rs b/rs-matter/src/data_model/core.rs index 95ae54ed..7405e2b9 100644 --- a/rs-matter/src/data_model/core.rs +++ b/rs-matter/src/data_model/core.rs @@ -521,41 +521,26 @@ where .unwrap(); let rx = self.subscriptions_buffers.borrow_mut().remove(index).buffer; - let mut exchange = if let Some(session_id) = session_id { - Exchange::initiate_for_session(matter, session_id)? - } else { - // Commented out as we have issues on HomeKit with that: - // https://github.com/ivmarkov/esp-idf-matter/issues/3 - // Exchange::initiate(matter, fabric_idx, peer_node_id, true).await? - Err(ErrorCode::NoSession)? - }; - - if let Some(mut tx) = self.buffers.get().await { - let primed = self - .report_data( - id, - fabric_idx.get(), - peer_node_id, - &rx, - &mut tx, - &mut exchange, - false, - ) - .await?; - - exchange.acknowledge().await?; - - if primed && self.subscriptions.mark_reported(id) { - let _ = - self.subscriptions_buffers - .borrow_mut() - .push(SubscriptionBuffer { + let result = self + .process_subscription(matter, fabric_idx, peer_node_id, session_id, id, &rx) + .await; + + match result { + Ok(primed) => { + if primed && self.subscriptions.mark_reported(id) { + let _ = self.subscriptions_buffers.borrow_mut().push( + SubscriptionBuffer { fabric_idx, peer_node_id, subscription_id: id, buffer: rx, - }); - subscribed.set(true); + }, + ); + subscribed.set(true); + } + } + Err(e) => { + error!("Error while processing subscription: {:?}", e); } } } else { @@ -565,6 +550,47 @@ where } } + async fn process_subscription( + &self, + matter: &Matter<'_>, + fabric_idx: NonZeroU8, + peer_node_id: u64, + session_id: Option, + id: u32, + rx: &[u8], + ) -> Result { + let mut exchange = if let Some(session_id) = session_id { + Exchange::initiate_for_session(matter, session_id)? + } else { + // Commented out as we have issues on HomeKit with that: + // https://github.com/ivmarkov/esp-idf-matter/issues/3 + // Exchange::initiate(matter, fabric_idx, peer_node_id, true).await? + Err(ErrorCode::NoSession)? + }; + + if let Some(mut tx) = self.buffers.get().await { + let primed = self + .report_data( + id, + fabric_idx.get(), + peer_node_id, + rx, + &mut tx, + &mut exchange, + false, + ) + .await?; + + exchange.acknowledge().await?; + + Ok(primed) + } else { + error!("No TX buffer available for processing subscription [F:{fabric_idx:x},P:{peer_node_id:x}]::{id}"); + + Ok(false) + } + } + async fn timed(&self, exchange: &mut Exchange<'_>) -> Result { let req = TimedReq::from_tlv(&get_root_node_struct(exchange.rx()?.payload())?)?; debug!("IM: Timed request: {:?}", req);