From fce78e866850d6add0011c50c18587e9a8982887 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Tue, 14 Jan 2025 12:27:15 +0100 Subject: [PATCH 1/3] Only compute data/query routes on nonwild keys with subs/qabls --- zenoh/src/net/routing/dispatcher/pubsub.rs | 11 +++++++---- zenoh/src/net/routing/dispatcher/queries.rs | 9 ++++++--- zenoh/src/net/routing/dispatcher/resource.rs | 8 ++++++++ zenoh/src/net/routing/hat/client/mod.rs | 12 +++++++++--- zenoh/src/net/routing/hat/linkstate_peer/mod.rs | 12 +++++++++--- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 12 +++++++++--- zenoh/src/net/routing/hat/router/mod.rs | 12 +++++++++--- 7 files changed, 57 insertions(+), 19 deletions(-) diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index b73e591873..e193281e85 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -224,7 +224,7 @@ pub(crate) fn compute_data_routes(tables: &Tables, expr: &mut RoutingExpr) -> Da } pub(crate) fn update_data_routes(tables: &Tables, res: &mut Arc) { - if res.context.is_some() { + if res.context.is_some() && !res.expr().contains('*') && res.has_subs() { let mut res_mut = res.clone(); let res_mut = get_mut_unchecked(&mut res_mut); compute_data_routes_( @@ -232,6 +232,7 @@ pub(crate) fn update_data_routes(tables: &Tables, res: &mut Arc) { &mut res_mut.context_mut().data_routes, &mut RoutingExpr::new(res, ""), ); + res_mut.context_mut().valid_data_routes = true; } } @@ -249,11 +250,13 @@ pub(crate) fn compute_matches_data_routes<'a>( ) -> Vec<(Arc, DataRoutes)> { let mut routes = vec![]; if res.context.is_some() { - let mut expr = RoutingExpr::new(res, ""); - routes.push((res.clone(), compute_data_routes(tables, &mut expr))); + if !res.expr().contains('*') && res.has_subs() { + let mut expr = RoutingExpr::new(res, ""); + routes.push((res.clone(), compute_data_routes(tables, &mut expr))); + } for match_ in &res.context().matches { let match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, res) { + if !Arc::ptr_eq(&match_, res) && !match_.expr().contains('*') && match_.has_subs() { let mut expr = RoutingExpr::new(&match_, ""); let match_routes = compute_data_routes(tables, &mut expr); routes.push((match_, match_routes)); diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index 13b92f6fc0..5d5f5064aa 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -252,7 +252,7 @@ pub(crate) fn compute_query_routes(tables: &Tables, res: &Arc) -> Quer } pub(crate) fn update_query_routes(tables: &Tables, res: &Arc) { - if res.context.is_some() { + if res.context.is_some() && !res.expr().contains('*') && res.has_qabls() { let mut res_mut = res.clone(); let res_mut = get_mut_unchecked(&mut res_mut); compute_query_routes_( @@ -260,6 +260,7 @@ pub(crate) fn update_query_routes(tables: &Tables, res: &Arc) { &mut res_mut.context_mut().query_routes, &mut RoutingExpr::new(res, ""), ); + res_mut.context_mut().valid_query_routes = true; } } @@ -277,10 +278,12 @@ pub(crate) fn compute_matches_query_routes( ) -> Vec<(Arc, QueryRoutes)> { let mut routes = vec![]; if res.context.is_some() { - routes.push((res.clone(), compute_query_routes(tables, res))); + if !res.expr().contains('*') && res.has_qabls() { + routes.push((res.clone(), compute_query_routes(tables, res))); + } for match_ in &res.context().matches { let match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, res) { + if !Arc::ptr_eq(&match_, res) && !match_.expr().contains('*') && match_.has_qabls() { let match_routes = compute_query_routes(tables, &match_); routes.push((match_, match_routes)); } diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index b54497ad47..330aeacf64 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -258,6 +258,14 @@ impl Resource { } } + pub(crate) fn has_subs(&self) -> bool { + self.session_ctxs.values().any(|sc| sc.subs.is_some()) + } + + pub(crate) fn has_qabls(&self) -> bool { + self.session_ctxs.values().any(|sc| sc.qabl.is_some()) + } + #[inline] pub(crate) fn data_route(&self, whatami: WhatAmI, context: NodeId) -> Option> { match &self.context { diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index 216e8732c4..c5ce9eb6c2 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -217,11 +217,17 @@ impl HatBaseTrait for HatCode { let mut matches_query_routes = vec![]; let rtables = zread!(tables.tables); for _match in subs_matches.drain(..) { - let mut expr = RoutingExpr::new(&_match, ""); - matches_data_routes.push((_match.clone(), compute_data_routes(&rtables, &mut expr))); + if !_match.expr().contains('*') && _match.has_subs() { + let mut expr = RoutingExpr::new(&_match, ""); + matches_data_routes + .push((_match.clone(), compute_data_routes(&rtables, &mut expr))); + } } for _match in qabls_matches.drain(..) { - matches_query_routes.push((_match.clone(), compute_query_routes(&rtables, &_match))); + if !_match.expr().contains('*') && _match.has_qabls() { + matches_query_routes + .push((_match.clone(), compute_query_routes(&rtables, &_match))); + } } drop(rtables); diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index 63d9c50eeb..20933aebae 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -352,11 +352,17 @@ impl HatBaseTrait for HatCode { let mut matches_query_routes = vec![]; let rtables = zread!(tables.tables); for _match in subs_matches.drain(..) { - let mut expr = RoutingExpr::new(&_match, ""); - matches_data_routes.push((_match.clone(), compute_data_routes(&rtables, &mut expr))); + if !_match.expr().contains('*') && _match.has_subs() { + let mut expr = RoutingExpr::new(&_match, ""); + matches_data_routes + .push((_match.clone(), compute_data_routes(&rtables, &mut expr))); + } } for _match in qabls_matches.drain(..) { - matches_query_routes.push((_match.clone(), compute_query_routes(&rtables, &_match))); + if !_match.expr().contains('*') && _match.has_qabls() { + matches_query_routes + .push((_match.clone(), compute_query_routes(&rtables, &_match))); + } } drop(rtables); diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 9beefbff5e..c9162f4ef3 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -303,11 +303,17 @@ impl HatBaseTrait for HatCode { let mut matches_query_routes = vec![]; let rtables = zread!(tables.tables); for _match in subs_matches.drain(..) { - let mut expr = RoutingExpr::new(&_match, ""); - matches_data_routes.push((_match.clone(), compute_data_routes(&rtables, &mut expr))); + if !_match.expr().contains('*') && _match.has_subs() { + let mut expr = RoutingExpr::new(&_match, ""); + matches_data_routes + .push((_match.clone(), compute_data_routes(&rtables, &mut expr))); + } } for _match in qabls_matches.drain(..) { - matches_query_routes.push((_match.clone(), compute_query_routes(&rtables, &_match))); + if !_match.expr().contains('*') && _match.has_qabls() { + matches_query_routes + .push((_match.clone(), compute_query_routes(&rtables, &_match))); + } } drop(rtables); diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index 5665009354..d71c06fe5c 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -522,11 +522,17 @@ impl HatBaseTrait for HatCode { let mut matches_query_routes = vec![]; let rtables = zread!(tables.tables); for _match in subs_matches.drain(..) { - let mut expr = RoutingExpr::new(&_match, ""); - matches_data_routes.push((_match.clone(), compute_data_routes(&rtables, &mut expr))); + if !_match.expr().contains('*') && _match.has_subs() { + let mut expr = RoutingExpr::new(&_match, ""); + matches_data_routes + .push((_match.clone(), compute_data_routes(&rtables, &mut expr))); + } } for _match in qabls_matches.drain(..) { - matches_query_routes.push((_match.clone(), compute_query_routes(&rtables, &_match))); + if !_match.expr().contains('*') && _match.has_qabls() { + matches_query_routes + .push((_match.clone(), compute_query_routes(&rtables, &_match))); + } } drop(rtables); From 2ae81044550956208bf684f56def39d279a8c543 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 15 Jan 2025 10:34:30 +0100 Subject: [PATCH 2/3] Fix regression in resource cleaning --- zenoh/src/net/routing/hat/client/mod.rs | 31 ++++++++++--------- .../src/net/routing/hat/linkstate_peer/mod.rs | 31 ++++++++++--------- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 31 ++++++++++--------- zenoh/src/net/routing/hat/router/mod.rs | 31 ++++++++++--------- 4 files changed, 68 insertions(+), 56 deletions(-) diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index c5ce9eb6c2..13690733bd 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -217,31 +217,34 @@ impl HatBaseTrait for HatCode { let mut matches_query_routes = vec![]; let rtables = zread!(tables.tables); for _match in subs_matches.drain(..) { - if !_match.expr().contains('*') && _match.has_subs() { + let route = (!_match.expr().contains('*') && _match.has_subs()).then(|| { let mut expr = RoutingExpr::new(&_match, ""); - matches_data_routes - .push((_match.clone(), compute_data_routes(&rtables, &mut expr))); - } + compute_data_routes(&rtables, &mut expr) + }); + matches_data_routes.push((_match.clone(), route)); } for _match in qabls_matches.drain(..) { - if !_match.expr().contains('*') && _match.has_qabls() { - matches_query_routes - .push((_match.clone(), compute_query_routes(&rtables, &_match))); - } + let route = (!_match.expr().contains('*') && _match.has_qabls()) + .then(|| compute_query_routes(&rtables, &_match)); + matches_query_routes.push((_match.clone(), route)); } drop(rtables); let mut wtables = zwrite!(tables.tables); for (mut res, data_routes) in matches_data_routes { - get_mut_unchecked(&mut res) - .context_mut() - .update_data_routes(data_routes); + if let Some(data_routes) = data_routes { + get_mut_unchecked(&mut res) + .context_mut() + .update_data_routes(data_routes); + } Resource::clean(&mut res); } for (mut res, query_routes) in matches_query_routes { - get_mut_unchecked(&mut res) - .context_mut() - .update_query_routes(query_routes); + if let Some(query_routes) = query_routes { + get_mut_unchecked(&mut res) + .context_mut() + .update_query_routes(query_routes); + } Resource::clean(&mut res); } wtables.faces.remove(&face.id); diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index 20933aebae..1dd6e7d5a7 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -352,31 +352,34 @@ impl HatBaseTrait for HatCode { let mut matches_query_routes = vec![]; let rtables = zread!(tables.tables); for _match in subs_matches.drain(..) { - if !_match.expr().contains('*') && _match.has_subs() { + let route = (!_match.expr().contains('*') && _match.has_subs()).then(|| { let mut expr = RoutingExpr::new(&_match, ""); - matches_data_routes - .push((_match.clone(), compute_data_routes(&rtables, &mut expr))); - } + compute_data_routes(&rtables, &mut expr) + }); + matches_data_routes.push((_match.clone(), route)); } for _match in qabls_matches.drain(..) { - if !_match.expr().contains('*') && _match.has_qabls() { - matches_query_routes - .push((_match.clone(), compute_query_routes(&rtables, &_match))); - } + let route = (!_match.expr().contains('*') && _match.has_qabls()) + .then(|| compute_query_routes(&rtables, &_match)); + matches_query_routes.push((_match.clone(), route)); } drop(rtables); let mut wtables = zwrite!(tables.tables); for (mut res, data_routes) in matches_data_routes { - get_mut_unchecked(&mut res) - .context_mut() - .update_data_routes(data_routes); + if let Some(data_routes) = data_routes { + get_mut_unchecked(&mut res) + .context_mut() + .update_data_routes(data_routes); + } Resource::clean(&mut res); } for (mut res, query_routes) in matches_query_routes { - get_mut_unchecked(&mut res) - .context_mut() - .update_query_routes(query_routes); + if let Some(query_routes) = query_routes { + get_mut_unchecked(&mut res) + .context_mut() + .update_query_routes(query_routes); + } Resource::clean(&mut res); } wtables.faces.remove(&face.id); diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index c9162f4ef3..7e08a3c92c 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -303,31 +303,34 @@ impl HatBaseTrait for HatCode { let mut matches_query_routes = vec![]; let rtables = zread!(tables.tables); for _match in subs_matches.drain(..) { - if !_match.expr().contains('*') && _match.has_subs() { + let route = (!_match.expr().contains('*') && _match.has_subs()).then(|| { let mut expr = RoutingExpr::new(&_match, ""); - matches_data_routes - .push((_match.clone(), compute_data_routes(&rtables, &mut expr))); - } + compute_data_routes(&rtables, &mut expr) + }); + matches_data_routes.push((_match.clone(), route)); } for _match in qabls_matches.drain(..) { - if !_match.expr().contains('*') && _match.has_qabls() { - matches_query_routes - .push((_match.clone(), compute_query_routes(&rtables, &_match))); - } + let route = (!_match.expr().contains('*') && _match.has_qabls()) + .then(|| compute_query_routes(&rtables, &_match)); + matches_query_routes.push((_match.clone(), route)); } drop(rtables); let mut wtables = zwrite!(tables.tables); for (mut res, data_routes) in matches_data_routes { - get_mut_unchecked(&mut res) - .context_mut() - .update_data_routes(data_routes); + if let Some(data_routes) = data_routes { + get_mut_unchecked(&mut res) + .context_mut() + .update_data_routes(data_routes); + } Resource::clean(&mut res); } for (mut res, query_routes) in matches_query_routes { - get_mut_unchecked(&mut res) - .context_mut() - .update_query_routes(query_routes); + if let Some(query_routes) = query_routes { + get_mut_unchecked(&mut res) + .context_mut() + .update_query_routes(query_routes); + } Resource::clean(&mut res); } wtables.faces.remove(&face.id); diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index d71c06fe5c..9300c29a90 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -522,31 +522,34 @@ impl HatBaseTrait for HatCode { let mut matches_query_routes = vec![]; let rtables = zread!(tables.tables); for _match in subs_matches.drain(..) { - if !_match.expr().contains('*') && _match.has_subs() { + let route = (!_match.expr().contains('*') && _match.has_subs()).then(|| { let mut expr = RoutingExpr::new(&_match, ""); - matches_data_routes - .push((_match.clone(), compute_data_routes(&rtables, &mut expr))); - } + compute_data_routes(&rtables, &mut expr) + }); + matches_data_routes.push((_match.clone(), route)); } for _match in qabls_matches.drain(..) { - if !_match.expr().contains('*') && _match.has_qabls() { - matches_query_routes - .push((_match.clone(), compute_query_routes(&rtables, &_match))); - } + let route = (!_match.expr().contains('*') && _match.has_qabls()) + .then(|| compute_query_routes(&rtables, &_match)); + matches_query_routes.push((_match.clone(), route)); } drop(rtables); let mut wtables = zwrite!(tables.tables); for (mut res, data_routes) in matches_data_routes { - get_mut_unchecked(&mut res) - .context_mut() - .update_data_routes(data_routes); + if let Some(data_routes) = data_routes { + get_mut_unchecked(&mut res) + .context_mut() + .update_data_routes(data_routes); + } Resource::clean(&mut res); } for (mut res, query_routes) in matches_query_routes { - get_mut_unchecked(&mut res) - .context_mut() - .update_query_routes(query_routes); + if let Some(query_routes) = query_routes { + get_mut_unchecked(&mut res) + .context_mut() + .update_query_routes(query_routes); + } Resource::clean(&mut res); } wtables.faces.remove(&face.id); From bcc7535dfb558a2765a80836170fe20fea667c50 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 15 Jan 2025 14:40:29 +0100 Subject: [PATCH 3/3] Compute identical routes from different source types once --- zenoh/src/net/routing/dispatcher/pubsub.rs | 53 +++-------------- zenoh/src/net/routing/dispatcher/queries.rs | 47 ++------------- zenoh/src/net/routing/dispatcher/resource.rs | 7 --- zenoh/src/net/routing/hat/client/mod.rs | 11 +--- zenoh/src/net/routing/hat/client/pubsub.rs | 19 ++++-- zenoh/src/net/routing/hat/client/queries.rs | 19 ++++-- .../src/net/routing/hat/linkstate_peer/mod.rs | 19 +----- .../net/routing/hat/linkstate_peer/pubsub.rs | 39 +++++++++++-- .../net/routing/hat/linkstate_peer/queries.rs | 39 +++++++++++-- zenoh/src/net/routing/hat/mod.rs | 11 +++- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 11 +--- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 21 +++++-- zenoh/src/net/routing/hat/p2p_peer/queries.rs | 20 +++++-- zenoh/src/net/routing/hat/router/mod.rs | 31 +--------- zenoh/src/net/routing/hat/router/pubsub.rs | 56 ++++++++++++++++-- zenoh/src/net/routing/hat/router/queries.rs | 58 +++++++++++++++++-- 16 files changed, 254 insertions(+), 207 deletions(-) diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index e193281e85..2987bfd93f 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -11,11 +11,14 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::{collections::HashMap, sync::Arc}; + +#[zenoh_macros::unstable] +use std::collections::HashMap; +use std::sync::Arc; use zenoh_core::zread; use zenoh_protocol::{ - core::{key_expr::keyexpr, Reliability, WhatAmI, WireExpr}, + core::{key_expr::keyexpr, Reliability, WireExpr}, network::{ declare::{ext, SubscriberId}, Push, @@ -177,49 +180,11 @@ pub(crate) fn undeclare_subscription( } } -fn compute_data_routes_(tables: &Tables, routes: &mut DataRoutes, expr: &mut RoutingExpr) { - let indexes = tables.hat_code.get_data_routes_entries(tables); - - let max_idx = indexes.routers.iter().max().unwrap(); - routes - .routers - .resize_with((*max_idx as usize) + 1, || Arc::new(HashMap::new())); - - for idx in indexes.routers { - routes.routers[idx as usize] = - tables - .hat_code - .compute_data_route(tables, expr, idx, WhatAmI::Router); - } - - let max_idx = indexes.peers.iter().max().unwrap(); - routes - .peers - .resize_with((*max_idx as usize) + 1, || Arc::new(HashMap::new())); - - for idx in indexes.peers { - routes.peers[idx as usize] = - tables - .hat_code - .compute_data_route(tables, expr, idx, WhatAmI::Peer); - } - - let max_idx = indexes.clients.iter().max().unwrap(); - routes - .clients - .resize_with((*max_idx as usize) + 1, || Arc::new(HashMap::new())); - - for idx in indexes.clients { - routes.clients[idx as usize] = - tables - .hat_code - .compute_data_route(tables, expr, idx, WhatAmI::Client); - } -} - pub(crate) fn compute_data_routes(tables: &Tables, expr: &mut RoutingExpr) -> DataRoutes { let mut routes = DataRoutes::default(); - compute_data_routes_(tables, &mut routes, expr); + tables + .hat_code + .compute_data_routes(tables, &mut routes, expr); routes } @@ -227,7 +192,7 @@ pub(crate) fn update_data_routes(tables: &Tables, res: &mut Arc) { if res.context.is_some() && !res.expr().contains('*') && res.has_subs() { let mut res_mut = res.clone(); let res_mut = get_mut_unchecked(&mut res_mut); - compute_data_routes_( + tables.hat_code.compute_data_routes( tables, &mut res_mut.context_mut().data_routes, &mut RoutingExpr::new(res, ""), diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index 5d5f5064aa..2bc92fdc3c 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -20,7 +20,6 @@ use std::{ use async_trait::async_trait; use tokio_util::sync::CancellationToken; use zenoh_buffers::ZBuf; -use zenoh_config::WhatAmI; #[cfg(feature = "stats")] use zenoh_protocol::zenoh::reply::ReplyBody; use zenoh_protocol::{ @@ -205,49 +204,11 @@ pub(crate) fn undeclare_queryable( } } -fn compute_query_routes_(tables: &Tables, routes: &mut QueryRoutes, expr: &mut RoutingExpr) { - let indexes = tables.hat_code.get_query_routes_entries(tables); - - let max_idx = indexes.routers.iter().max().unwrap(); - routes.routers.resize_with((*max_idx as usize) + 1, || { - Arc::new(QueryTargetQablSet::new()) - }); - - for idx in indexes.routers { - routes.routers[idx as usize] = - tables - .hat_code - .compute_query_route(tables, expr, idx, WhatAmI::Router); - } - - let max_idx = indexes.peers.iter().max().unwrap(); - routes.peers.resize_with((*max_idx as usize) + 1, || { - Arc::new(QueryTargetQablSet::new()) - }); - - for idx in indexes.peers { - routes.peers[idx as usize] = - tables - .hat_code - .compute_query_route(tables, expr, idx, WhatAmI::Peer); - } - - let max_idx = indexes.clients.iter().max().unwrap(); - routes.clients.resize_with((*max_idx as usize) + 1, || { - Arc::new(QueryTargetQablSet::new()) - }); - - for idx in indexes.clients { - routes.clients[idx as usize] = - tables - .hat_code - .compute_query_route(tables, expr, idx, WhatAmI::Client); - } -} - pub(crate) fn compute_query_routes(tables: &Tables, res: &Arc) -> QueryRoutes { let mut routes = QueryRoutes::default(); - compute_query_routes_(tables, &mut routes, &mut RoutingExpr::new(res, "")); + tables + .hat_code + .compute_query_routes(tables, &mut routes, &mut RoutingExpr::new(res, "")); routes } @@ -255,7 +216,7 @@ pub(crate) fn update_query_routes(tables: &Tables, res: &Arc) { if res.context.is_some() && !res.expr().contains('*') && res.has_qabls() { let mut res_mut = res.clone(); let res_mut = get_mut_unchecked(&mut res_mut); - compute_query_routes_( + tables.hat_code.compute_query_routes( tables, &mut res_mut.context_mut().query_routes, &mut RoutingExpr::new(res, ""), diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index 330aeacf64..58e1459ece 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -75,13 +75,6 @@ impl SessionContext { } } -#[derive(Default)] -pub(crate) struct RoutesIndexes { - pub(crate) routers: Vec, - pub(crate) peers: Vec, - pub(crate) clients: Vec, -} - #[derive(Default)] pub(crate) struct DataRoutes { pub(crate) routers: Vec>, diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index 13690733bd..ee4e2a0c6a 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -49,7 +49,7 @@ use super::{ use crate::net::{ routing::{ dispatcher::{face::Face, interests::RemoteInterest}, - router::{compute_data_routes, compute_query_routes, RoutesIndexes}, + router::{compute_data_routes, compute_query_routes}, }, runtime::Runtime, }; @@ -330,12 +330,3 @@ impl HatFace { } impl HatTrait for HatCode {} - -#[inline] -fn get_routes_entries() -> RoutesIndexes { - RoutesIndexes { - routers: vec![0], - peers: vec![0], - clients: vec![0], - } -} diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 4b395de6df..fbdf19efea 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -26,7 +26,7 @@ use zenoh_protocol::{ }; use zenoh_sync::get_mut_unchecked; -use super::{face_hat, face_hat_mut, get_routes_entries, HatCode, HatFace}; +use super::{face_hat, face_hat_mut, HatCode, HatFace}; use crate::{ key_expr::KeyExpr, net::routing::{ @@ -36,8 +36,8 @@ use crate::{ resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, }, - hat::{HatPubSubTrait, SendDeclare, Sources}, - router::{update_data_routes_from, RoutesIndexes}, + hat::{DataRoutes, HatPubSubTrait, SendDeclare, Sources}, + router::update_data_routes_from, RoutingContext, }, }; @@ -402,8 +402,17 @@ impl HatPubSubTrait for HatCode { Arc::new(route) } - fn get_data_routes_entries(&self, _tables: &Tables) -> RoutesIndexes { - get_routes_entries() + fn compute_data_routes( + &self, + tables: &Tables, + routes: &mut DataRoutes, + expr: &mut RoutingExpr, + ) { + let route = self.compute_data_route(tables, expr, 0, WhatAmI::Peer); + routes.routers.resize_with(1, || route.clone()); + routes.peers.resize_with(1, || route.clone()); + let route = self.compute_data_route(tables, expr, 0, WhatAmI::Client); + routes.clients.resize_with(1, || route.clone()); } #[zenoh_macros::unstable] diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index a7b07f407b..9db7e319c6 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -32,7 +32,7 @@ use zenoh_protocol::{ }; use zenoh_sync::get_mut_unchecked; -use super::{face_hat, face_hat_mut, get_routes_entries, HatCode, HatFace}; +use super::{face_hat, face_hat_mut, HatCode, HatFace}; use crate::{ key_expr::KeyExpr, net::routing::{ @@ -41,8 +41,8 @@ use crate::{ resource::{NodeId, Resource, SessionContext}, tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, }, - hat::{HatQueriesTrait, SendDeclare, Sources}, - router::{update_query_routes_from, RoutesIndexes}, + hat::{HatQueriesTrait, QueryRoutes, SendDeclare, Sources}, + router::update_query_routes_from, RoutingContext, }, }; @@ -427,8 +427,17 @@ impl HatQueriesTrait for HatCode { Arc::new(route) } - fn get_query_routes_entries(&self, _tables: &Tables) -> RoutesIndexes { - get_routes_entries() + fn compute_query_routes( + &self, + tables: &Tables, + routes: &mut QueryRoutes, + expr: &mut RoutingExpr, + ) { + let route = self.compute_query_route(tables, expr, 0, WhatAmI::Peer); + routes.routers.resize_with(1, || route.clone()); + routes.peers.resize_with(1, || route.clone()); + let route = self.compute_query_route(tables, expr, 0, WhatAmI::Client); + routes.clients.resize_with(1, || route.clone()); } #[cfg(feature = "unstable")] diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index 1dd6e7d5a7..748bf8c132 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -58,7 +58,7 @@ use crate::net::{ routing::{ dispatcher::{face::Face, interests::RemoteInterest}, hat::TREES_COMPUTATION_DELAY_MS, - router::{compute_data_routes, compute_query_routes, RoutesIndexes}, + router::{compute_data_routes, compute_query_routes}, }, runtime::Runtime, }; @@ -554,20 +554,3 @@ fn get_peer(tables: &Tables, face: &Arc, nodeid: NodeId) -> Option RoutesIndexes { - let indexes = hat!(tables) - .linkstatepeers_net - .as_ref() - .unwrap() - .graph - .node_indices() - .map(|i| i.index() as NodeId) - .collect::>(); - RoutesIndexes { - routers: indexes.clone(), - peers: indexes, - clients: vec![0], - } -} diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index 6dfdbc0f76..e2799475cd 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -31,8 +31,8 @@ use zenoh_protocol::{ use zenoh_sync::get_mut_unchecked; use super::{ - face_hat, face_hat_mut, get_peer, get_routes_entries, hat, hat_mut, network::Network, res_hat, - res_hat_mut, HatCode, HatContext, HatFace, HatTables, + face_hat, face_hat_mut, get_peer, hat, hat_mut, network::Network, res_hat, res_hat_mut, + HatCode, HatContext, HatFace, HatTables, }; #[cfg(feature = "unstable")] use crate::key_expr::KeyExpr; @@ -44,8 +44,7 @@ use crate::net::routing::{ resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, }, - hat::{CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources}, - router::RoutesIndexes, + hat::{CurrentFutureTrait, DataRoutes, HatPubSubTrait, SendDeclare, Sources}, RoutingContext, }; @@ -986,8 +985,36 @@ impl HatPubSubTrait for HatCode { Arc::new(route) } - fn get_data_routes_entries(&self, tables: &Tables) -> RoutesIndexes { - get_routes_entries(tables) + fn compute_data_routes( + &self, + tables: &Tables, + routes: &mut DataRoutes, + expr: &mut RoutingExpr, + ) { + let indexes = hat!(tables) + .linkstatepeers_net + .as_ref() + .unwrap() + .graph + .node_indices() + .map(|i| i.index() as NodeId) + .collect::>(); + let max_idx = indexes.iter().max().unwrap(); + routes + .routers + .resize_with((*max_idx as usize) + 1, || Arc::new(HashMap::new())); + routes + .peers + .resize_with((*max_idx as usize) + 1, || Arc::new(HashMap::new())); + for idx in indexes { + let route = self.compute_data_route(tables, expr, idx, WhatAmI::Peer); + routes.routers[idx as usize] = route.clone(); + routes.peers[idx as usize] = route; + } + + routes.clients.resize_with(1, || { + self.compute_data_route(tables, expr, 0, WhatAmI::Client) + }); } #[zenoh_macros::unstable] diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index c122d369b5..760cfc9cf6 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -37,8 +37,8 @@ use zenoh_protocol::{ use zenoh_sync::get_mut_unchecked; use super::{ - face_hat, face_hat_mut, get_peer, get_routes_entries, hat, hat_mut, network::Network, res_hat, - res_hat_mut, HatCode, HatContext, HatFace, HatTables, + face_hat, face_hat_mut, get_peer, hat, hat_mut, network::Network, res_hat, res_hat_mut, + HatCode, HatContext, HatFace, HatTables, }; #[cfg(feature = "unstable")] use crate::key_expr::KeyExpr; @@ -49,8 +49,7 @@ use crate::net::routing::{ resource::{NodeId, Resource, SessionContext}, tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, }, - hat::{CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources}, - router::RoutesIndexes, + hat::{CurrentFutureTrait, HatQueriesTrait, QueryRoutes, SendDeclare, Sources}, RoutingContext, }; @@ -1029,8 +1028,36 @@ impl HatQueriesTrait for HatCode { Arc::new(route) } - fn get_query_routes_entries(&self, tables: &Tables) -> RoutesIndexes { - get_routes_entries(tables) + fn compute_query_routes( + &self, + tables: &Tables, + routes: &mut QueryRoutes, + expr: &mut RoutingExpr, + ) { + let indexes = hat!(tables) + .linkstatepeers_net + .as_ref() + .unwrap() + .graph + .node_indices() + .map(|i| i.index() as NodeId) + .collect::>(); + let max_idx = indexes.iter().max().unwrap(); + routes.routers.resize_with((*max_idx as usize) + 1, || { + Arc::new(QueryTargetQablSet::new()) + }); + routes.peers.resize_with((*max_idx as usize) + 1, || { + Arc::new(QueryTargetQablSet::new()) + }); + for idx in indexes { + let route = self.compute_query_route(tables, expr, idx, WhatAmI::Peer); + routes.routers[idx as usize] = route.clone(); + routes.peers[idx as usize] = route; + } + + routes.clients.resize_with(1, || { + self.compute_query_route(tables, expr, 0, WhatAmI::Client) + }); } #[cfg(feature = "unstable")] diff --git a/zenoh/src/net/routing/hat/mod.rs b/zenoh/src/net/routing/hat/mod.rs index 27b772b3fc..12116558e5 100644 --- a/zenoh/src/net/routing/hat/mod.rs +++ b/zenoh/src/net/routing/hat/mod.rs @@ -39,7 +39,7 @@ use super::{ pubsub::SubscriberInfo, tables::{NodeId, QueryTargetQablSet, Resource, Route, RoutingExpr, Tables, TablesLock}, }, - router::RoutesIndexes, + router::{DataRoutes, QueryRoutes}, RoutingContext, }; use crate::net::runtime::Runtime; @@ -191,7 +191,7 @@ pub(crate) trait HatPubSubTrait { source_type: WhatAmI, ) -> Arc; - fn get_data_routes_entries(&self, tables: &Tables) -> RoutesIndexes; + fn compute_data_routes(&self, tables: &Tables, routes: &mut DataRoutes, expr: &mut RoutingExpr); #[zenoh_macros::unstable] fn get_matching_subscriptions( @@ -235,7 +235,12 @@ pub(crate) trait HatQueriesTrait { source_type: WhatAmI, ) -> Arc; - fn get_query_routes_entries(&self, tables: &Tables) -> RoutesIndexes; + fn compute_query_routes( + &self, + tables: &Tables, + routes: &mut QueryRoutes, + expr: &mut RoutingExpr, + ); #[zenoh_macros::unstable] fn get_matching_queryables( diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 7e08a3c92c..a5fa389ec5 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -63,7 +63,7 @@ use crate::net::{ face::{Face, InterestState}, interests::RemoteInterest, }, - router::{compute_data_routes, compute_query_routes, RoutesIndexes}, + router::{compute_data_routes, compute_query_routes}, RoutingContext, }, runtime::Runtime, @@ -442,15 +442,6 @@ impl HatFace { impl HatTrait for HatCode {} -#[inline] -fn get_routes_entries() -> RoutesIndexes { - RoutesIndexes { - routers: vec![0], - peers: vec![0], - clients: vec![0], - } -} - // In p2p, at connection, while no interest is sent on the network, // peers act as if they received an interest CurrentFuture with id 0 // and send back a DeclareFinal with interest_id 0. diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index 8acb12e216..a41a54fba9 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -29,7 +29,7 @@ use zenoh_protocol::{ }; use zenoh_sync::get_mut_unchecked; -use super::{face_hat, face_hat_mut, get_routes_entries, HatCode, HatFace}; +use super::{face_hat, face_hat_mut, HatCode, HatFace}; use crate::{ key_expr::KeyExpr, net::routing::{ @@ -41,9 +41,10 @@ use crate::{ tables::{Route, RoutingExpr, Tables}, }, hat::{ - p2p_peer::initial_interest, CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources, + p2p_peer::initial_interest, CurrentFutureTrait, DataRoutes, HatPubSubTrait, + SendDeclare, Sources, }, - router::{update_data_routes_from, RoutesIndexes}, + router::update_data_routes_from, RoutingContext, }, }; @@ -692,8 +693,18 @@ impl HatPubSubTrait for HatCode { Arc::new(route) } - fn get_data_routes_entries(&self, _tables: &Tables) -> RoutesIndexes { - get_routes_entries() + fn compute_data_routes( + &self, + tables: &Tables, + routes: &mut DataRoutes, + expr: &mut RoutingExpr, + ) { + let route = self.compute_data_route(tables, expr, 0, WhatAmI::Peer); + routes.routers.resize_with(1, || route.clone()); + routes.peers.resize_with(1, || route.clone()); + routes.clients.resize_with(1, || { + self.compute_data_route(tables, expr, 0, WhatAmI::Client) + }); } #[zenoh_macros::unstable] diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 664822f6fe..6d2c558fc3 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -35,7 +35,7 @@ use zenoh_protocol::{ }; use zenoh_sync::get_mut_unchecked; -use super::{face_hat, face_hat_mut, get_routes_entries, HatCode, HatFace}; +use super::{face_hat, face_hat_mut, HatCode, HatFace}; use crate::{ key_expr::KeyExpr, net::routing::{ @@ -45,9 +45,10 @@ use crate::{ tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, }, hat::{ - p2p_peer::initial_interest, CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources, + p2p_peer::initial_interest, CurrentFutureTrait, HatQueriesTrait, QueryRoutes, + SendDeclare, Sources, }, - router::{update_query_routes_from, RoutesIndexes}, + router::update_query_routes_from, RoutingContext, }, }; @@ -685,8 +686,17 @@ impl HatQueriesTrait for HatCode { Arc::new(route) } - fn get_query_routes_entries(&self, _tables: &Tables) -> RoutesIndexes { - get_routes_entries() + fn compute_query_routes( + &self, + tables: &Tables, + routes: &mut QueryRoutes, + expr: &mut RoutingExpr, + ) { + let route = self.compute_query_route(tables, expr, 0, WhatAmI::Peer); + routes.routers.resize_with(1, || route.clone()); + routes.peers.resize_with(1, || route.clone()); + let route = self.compute_query_route(tables, expr, 0, WhatAmI::Client); + routes.clients.resize_with(1, || route.clone()); } #[cfg(feature = "unstable")] diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index 9300c29a90..1ff8d858d2 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -59,7 +59,7 @@ use crate::net::{ routing::{ dispatcher::{face::Face, interests::RemoteInterest}, hat::TREES_COMPUTATION_DELAY_MS, - router::{compute_data_routes, compute_query_routes, RoutesIndexes}, + router::{compute_data_routes, compute_query_routes}, }, runtime::Runtime, }; @@ -947,32 +947,3 @@ fn get_peer(tables: &Tables, face: &Arc, nodeid: NodeId) -> Option RoutesIndexes { - let routers_indexes = hat!(tables) - .routers_net - .as_ref() - .unwrap() - .graph - .node_indices() - .map(|i| i.index() as NodeId) - .collect::>(); - let peers_indexes = if hat!(tables).full_net(WhatAmI::Peer) { - hat!(tables) - .linkstatepeers_net - .as_ref() - .unwrap() - .graph - .node_indices() - .map(|i| i.index() as NodeId) - .collect::>() - } else { - vec![0] - }; - RoutesIndexes { - routers: routers_indexes, - peers: peers_indexes, - clients: vec![0], - } -} diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index 9cf07db2a2..4a77a850f1 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -31,7 +31,7 @@ use zenoh_protocol::{ use zenoh_sync::get_mut_unchecked; use super::{ - face_hat, face_hat_mut, get_peer, get_router, get_routes_entries, hat, hat_mut, + face_hat, face_hat_mut, get_peer, get_router, hat, hat_mut, interests::push_declaration_profile, network::Network, res_hat, res_hat_mut, HatCode, HatContext, HatFace, HatTables, }; @@ -45,8 +45,7 @@ use crate::net::routing::{ resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, }, - hat::{CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources}, - router::RoutesIndexes, + hat::{CurrentFutureTrait, DataRoutes, HatPubSubTrait, SendDeclare, Sources}, RoutingContext, }; @@ -1334,8 +1333,55 @@ impl HatPubSubTrait for HatCode { Arc::new(route) } - fn get_data_routes_entries(&self, tables: &Tables) -> RoutesIndexes { - get_routes_entries(tables) + fn compute_data_routes( + &self, + tables: &Tables, + routes: &mut DataRoutes, + expr: &mut RoutingExpr, + ) { + let routers_indexes = hat!(tables) + .routers_net + .as_ref() + .unwrap() + .graph + .node_indices() + .map(|i| i.index() as NodeId) + .collect::>(); + let max_idx = routers_indexes.iter().max().unwrap(); + routes + .routers + .resize_with((*max_idx as usize) + 1, || Arc::new(HashMap::new())); + + for idx in routers_indexes { + routes.routers[idx as usize] = + self.compute_data_route(tables, expr, idx, WhatAmI::Router); + } + + if hat!(tables).full_net(WhatAmI::Peer) { + let peers_indexes = hat!(tables) + .linkstatepeers_net + .as_ref() + .unwrap() + .graph + .node_indices() + .map(|i| i.index() as NodeId) + .collect::>(); + let max_idx = peers_indexes.iter().max().unwrap(); + routes + .peers + .resize_with((*max_idx as usize) + 1, || Arc::new(HashMap::new())); + for idx in peers_indexes { + routes.peers[idx as usize] = + self.compute_data_route(tables, expr, idx, WhatAmI::Peer); + } + } else { + routes.peers.resize_with(1, || { + self.compute_data_route(tables, expr, 0, WhatAmI::Peer) + }); + }; + routes.clients.resize_with(1, || { + self.compute_data_route(tables, expr, 0, WhatAmI::Client) + }); } #[zenoh_macros::unstable] diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 97a772a9df..6dbdc8ac23 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -37,7 +37,7 @@ use zenoh_protocol::{ use zenoh_sync::get_mut_unchecked; use super::{ - face_hat, face_hat_mut, get_peer, get_router, get_routes_entries, hat, hat_mut, + face_hat, face_hat_mut, get_peer, get_router, hat, hat_mut, interests::push_declaration_profile, network::Network, res_hat, res_hat_mut, HatCode, HatContext, HatFace, HatTables, }; @@ -50,8 +50,7 @@ use crate::net::routing::{ resource::{NodeId, Resource, SessionContext}, tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, }, - hat::{CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources}, - router::RoutesIndexes, + hat::{CurrentFutureTrait, HatQueriesTrait, QueryRoutes, SendDeclare, Sources}, RoutingContext, }; @@ -1512,8 +1511,57 @@ impl HatQueriesTrait for HatCode { Arc::new(route) } - fn get_query_routes_entries(&self, tables: &Tables) -> RoutesIndexes { - get_routes_entries(tables) + fn compute_query_routes( + &self, + tables: &Tables, + routes: &mut QueryRoutes, + expr: &mut RoutingExpr, + ) { + let routers_indexes = hat!(tables) + .routers_net + .as_ref() + .unwrap() + .graph + .node_indices() + .map(|i| i.index() as NodeId) + .collect::>(); + let max_idx = routers_indexes.iter().max().unwrap(); + routes.routers.resize_with((*max_idx as usize) + 1, || { + Arc::new(QueryTargetQablSet::new()) + }); + + for idx in routers_indexes { + routes.routers[idx as usize] = + self.compute_query_route(tables, expr, idx, WhatAmI::Router); + } + + if hat!(tables).full_net(WhatAmI::Peer) { + let peers_indexes = hat!(tables) + .linkstatepeers_net + .as_ref() + .unwrap() + .graph + .node_indices() + .map(|i| i.index() as NodeId) + .collect::>(); + let max_idx = peers_indexes.iter().max().unwrap(); + routes.peers.resize_with((*max_idx as usize) + 1, || { + Arc::new(QueryTargetQablSet::new()) + }); + for idx in peers_indexes { + routes.peers[idx as usize] = + self.compute_query_route(tables, expr, idx, WhatAmI::Peer); + } + } else { + routes.peers.resize_with(1, || { + self.compute_query_route(tables, expr, 0, WhatAmI::Peer) + }); + }; + routes.clients.resize_with(1, || { + tables + .hat_code + .compute_query_route(tables, expr, 0, WhatAmI::Client) + }); } #[cfg(feature = "unstable")]