Skip to content

Commit

Permalink
Fix bug in Query Timeout (#1021)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart authored May 13, 2024
1 parent b8dd01d commit 45e05f0
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 18 deletions.
5 changes: 3 additions & 2 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ impl Primitives for Face {
&self.state,
&msg.wire_expr,
msg.ext_qos,
msg.ext_tstamp,
msg.payload,
msg.ext_nodeid.node_id,
);
Expand All @@ -260,10 +261,10 @@ impl Primitives for Face {
&self.tables,
&self.state,
&msg.wire_expr,
// parameters,
msg.id,
msg.ext_target,
// consolidation,
msg.ext_budget,
msg.ext_timeout,
msg.payload,
msg.ext_nodeid.node_id,
);
Expand Down
7 changes: 4 additions & 3 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ pub fn full_reentrant_route_data(
face: &FaceState,
expr: &WireExpr,
ext_qos: ext::QoSType,
ext_tstamp: Option<ext::TimestampType>,
mut payload: PushBody,
routing_context: NodeId,
) {
Expand Down Expand Up @@ -478,7 +479,7 @@ pub fn full_reentrant_route_data(
outface.primitives.send_push(Push {
wire_expr: key_expr.into(),
ext_qos,
ext_tstamp: None,
ext_tstamp,
ext_nodeid: ext::NodeIdType { node_id: *context },
payload,
})
Expand Down Expand Up @@ -513,7 +514,7 @@ pub fn full_reentrant_route_data(
outface.primitives.send_push(Push {
wire_expr: key_expr,
ext_qos,
ext_tstamp: None,
ext_tstamp,
ext_nodeid: ext::NodeIdType { node_id: context },
payload: payload.clone(),
})
Expand All @@ -540,7 +541,7 @@ pub fn full_reentrant_route_data(
outface.primitives.send_push(Push {
wire_expr: key_expr.into(),
ext_qos,
ext_tstamp: None,
ext_tstamp,
ext_nodeid: ext::NodeIdType { node_id: *context },
payload: payload.clone(),
})
Expand Down
64 changes: 51 additions & 13 deletions zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,21 @@ use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use zenoh_buffers::ZBuf;
use zenoh_config::WhatAmI;
use zenoh_protocol::core::key_expr::keyexpr;
use zenoh_protocol::core::KnownEncoding;
use zenoh_protocol::network::declare::queryable::ext::QueryableInfo;
use zenoh_protocol::zenoh;
use zenoh_protocol::zenoh::ext::ValueType;
use zenoh_protocol::{
core::{Encoding, WireExpr},
network::{
declare::ext,
request::{ext::TargetType, Request, RequestId},
request::{
ext::{BudgetType, TargetType, TimeoutType},
Request, RequestId,
},
response::{self, ext::ResponderIdType, Response, ResponseFinal},
},
zenoh::{reply::ext::ConsolidationType, Reply, RequestBody, ResponseBody},
Expand Down Expand Up @@ -365,6 +372,7 @@ struct QueryCleanup {
tables: Arc<TablesLock>,
face: Weak<FaceState>,
qid: RequestId,
timeout: Duration,
}

impl QueryCleanup {
Expand All @@ -378,6 +386,7 @@ impl QueryCleanup {
tables: tables_ref.clone(),
face: Arc::downgrade(face),
qid,
timeout,
};
if let Some((_, cancellation_token)) = face.pending_queries.get(&qid) {
let c_cancellation_token = cancellation_token.clone();
Expand All @@ -396,17 +405,42 @@ impl QueryCleanup {
impl Timed for QueryCleanup {
async fn run(&mut self) {
if let Some(mut face) = self.face.upgrade() {
let tables_lock = zwrite!(self.tables.tables);
let ext_respid = Some(response::ext::ResponderIdType {
zid: face.zid,
eid: 0,
});
route_send_response(
&self.tables,
&mut face,
self.qid,
ext_respid,
WireExpr::empty(),
ResponseBody::Err(zenoh::Err {
timestamp: None,
is_infrastructure: false,
ext_sinfo: None,
ext_unknown: vec![],
ext_body: Some(ValueType {
#[cfg(feature = "shared-memory")]
ext_shm: None,
payload: ZBuf::from("Timeout".as_bytes().to_vec()),
encoding: KnownEncoding::TextPlain.into(),
}),
code: 0, // TODO
}),
);
let queries_lock = zwrite!(self.tables.queries_lock);
if let Some(query) = get_mut_unchecked(&mut face)
.pending_queries
.remove(&self.qid)
{
drop(tables_lock);
drop(queries_lock);
tracing::warn!(
"Didn't receive final reply {}:{} from {}: Timeout!",
"Didn't receive final reply {}:{} from {}: Timeout({:#?})!",
query.0.src_face,
self.qid,
face
face,
self.timeout,
);
finalize_pending_query(query);
}
Expand Down Expand Up @@ -513,12 +547,15 @@ macro_rules! inc_res_stats {
};
}

#[allow(clippy::too_many_arguments)]
pub fn route_query(
tables_ref: &Arc<TablesLock>,
face: &Arc<FaceState>,
expr: &WireExpr,
qid: RequestId,
target: TargetType,
ext_target: TargetType,
ext_budget: Option<BudgetType>,
ext_timeout: Option<TimeoutType>,
body: RequestBody,
routing_context: NodeId,
) {
Expand Down Expand Up @@ -555,14 +592,15 @@ pub fn route_query(
});

let queries_lock = zwrite!(tables_ref.queries_lock);
let route = compute_final_route(&rtables, &route, face, &mut expr, &target, query);
let route =
compute_final_route(&rtables, &route, face, &mut expr, &ext_target, query);
let local_replies =
rtables
.hat_code
.compute_local_replies(&rtables, &prefix, expr.suffix, face);
let zid = rtables.zid;

let timeout = rtables.queries_default_timeout;
let timeout = ext_timeout.unwrap_or(rtables.queries_default_timeout);

drop(queries_lock);
drop(rtables);
Expand Down Expand Up @@ -643,8 +681,8 @@ pub fn route_query(
ext_tstamp: None,
ext_nodeid: ext::NodeIdType { node_id: *context },
ext_target: *t,
ext_budget: None,
ext_timeout: None,
ext_budget,
ext_timeout,
payload: body.clone(),
},
expr.full_expr().to_string(),
Expand Down Expand Up @@ -673,9 +711,9 @@ pub fn route_query(
ext_qos: ext::QoSType::request_default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType { node_id: *context },
ext_target: target,
ext_budget: None,
ext_timeout: None,
ext_target,
ext_budget,
ext_timeout,
payload: body.clone(),
},
expr.full_expr().to_string(),
Expand Down
5 changes: 5 additions & 0 deletions zenoh/src/net/tests/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ fn client_test() {
&face0.upgrade().unwrap(),
&"test/client/z1_wr1".into(),
ext::QoSType::default(),
None,
PushBody::Put(Put {
timestamp: None,
encoding: Encoding::default(),
Expand Down Expand Up @@ -659,6 +660,7 @@ fn client_test() {
&face0.upgrade().unwrap(),
&WireExpr::from(11).with_suffix("/z1_wr2"),
ext::QoSType::default(),
None,
PushBody::Put(Put {
timestamp: None,
encoding: Encoding::default(),
Expand Down Expand Up @@ -692,6 +694,7 @@ fn client_test() {
&face1.upgrade().unwrap(),
&"test/client/**".into(),
ext::QoSType::default(),
None,
PushBody::Put(Put {
timestamp: None,
encoding: Encoding::default(),
Expand Down Expand Up @@ -725,6 +728,7 @@ fn client_test() {
&face0.upgrade().unwrap(),
&12.into(),
ext::QoSType::default(),
None,
PushBody::Put(Put {
timestamp: None,
encoding: Encoding::default(),
Expand Down Expand Up @@ -758,6 +762,7 @@ fn client_test() {
&face1.upgrade().unwrap(),
&22.into(),
ext::QoSType::default(),
None,
PushBody::Put(Put {
timestamp: None,
encoding: Encoding::default(),
Expand Down

0 comments on commit 45e05f0

Please sign in to comment.