Skip to content

Commit

Permalink
Stop publication cache task on disconnected channels
Browse files Browse the repository at this point in the history
  • Loading branch information
fuzzypixelz committed Dec 17, 2024
1 parent 3744075 commit 26481e7
Showing 1 changed file with 41 additions and 37 deletions.
78 changes: 41 additions & 37 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,34 +260,53 @@ impl PublicationCache {
tokio::select! {
// on publication received by the local subscriber, store it
sample = sub_recv.recv_async() => {
if let Ok(sample) = sample {
let queryable_key_expr: KeyExpr<'_> = if let Some(prefix) = &queryable_prefix {
prefix.join(&sample.key_expr()).unwrap().into()
} else {
sample.key_expr().clone()
};

if let Some(queue) = cache.get_mut(queryable_key_expr.as_keyexpr()) {
if queue.len() >= history {
queue.pop_front();
}
queue.push_back(sample);
} else if cache.len() >= limit {
tracing::error!("PublicationCache on {}: resource_limit exceeded - can't cache publication for a new resource",
pub_key_expr);
} else {
let mut queue: VecDeque<Sample> = VecDeque::new();
queue.push_back(sample);
cache.insert(queryable_key_expr.into(), queue);
let Ok(sample) = sample else {
return;
};

let queryable_key_expr: KeyExpr<'_> = if let Some(prefix) = &queryable_prefix {
prefix.join(&sample.key_expr()).unwrap().into()
} else {
sample.key_expr().clone()
};

if let Some(queue) = cache.get_mut(queryable_key_expr.as_keyexpr()) {
if queue.len() >= history {
queue.pop_front();
}
queue.push_back(sample);
} else if cache.len() >= limit {
tracing::error!("PublicationCache on {}: resource_limit exceeded - can't cache publication for a new resource",
pub_key_expr);
} else {
let mut queue: VecDeque<Sample> = VecDeque::new();
queue.push_back(sample);
cache.insert(queryable_key_expr.into(), queue);
}
},

// on query, reply with cached content
query = quer_recv.recv_async() => {
if let Ok(query) = query {
if !query.key_expr().as_str().contains('*') {
if let Some(queue) = cache.get(query.key_expr().as_keyexpr()) {
let Ok(query) = query else {
return;
};

if !query.key_expr().as_str().contains('*') {
if let Some(queue) = cache.get(query.key_expr().as_keyexpr()) {
for sample in queue {
if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
if !time_range.contains(timestamp.get_time().to_system_time()){
continue;
}
}
if let Err(e) = query.reply_sample(sample.clone()).await {
tracing::warn!("Error replying to query: {}", e);
}
}
}
} else {
for (key_expr, queue) in cache.iter() {
if query.key_expr().intersects(unsafe{ keyexpr::from_str_unchecked(key_expr) }) {
for sample in queue {
if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
if !time_range.contains(timestamp.get_time().to_system_time()){
Expand All @@ -299,21 +318,6 @@ impl PublicationCache {
}
}
}
} else {
for (key_expr, queue) in cache.iter() {
if query.key_expr().intersects(unsafe{ keyexpr::from_str_unchecked(key_expr) }) {
for sample in queue {
if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
if !time_range.contains(timestamp.get_time().to_system_time()){
continue;
}
}
if let Err(e) = query.reply_sample(sample.clone()).await {
tracing::warn!("Error replying to query: {}", e);
}
}
}
}
}
}
},
Expand Down

0 comments on commit 26481e7

Please sign in to comment.