Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add QoS configuration overwrites for publication messages #1613

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3489a1f
Initial implementation of publisher builder config overwrites
oteffahi Nov 26, 2024
c8083f7
Add publisher builder overwrite test
oteffahi Nov 26, 2024
e45bd6f
Remove unnecessary return
oteffahi Nov 26, 2024
1d695db
Fix default config spacing
oteffahi Nov 27, 2024
0d9d145
Change config format to publishers/default_builders
oteffahi Nov 27, 2024
3fc0492
Update QoS test config
oteffahi Nov 27, 2024
e7f7fa6
Move new PublisherBuilder logic
oteffahi Nov 27, 2024
9eca040
Log warning when multiple builder configs can apply for the same keyexpr
oteffahi Nov 27, 2024
95c8cd3
Rename builders module to publishers
oteffahi Nov 28, 2024
ea15fd8
Change config format to qos/put
oteffahi Nov 29, 2024
eb8645f
Rename publishers module to qos
oteffahi Nov 29, 2024
bd1bf11
Get QoS config from runtime instead of drilling through session API
oteffahi Nov 29, 2024
dafadcc
Update QoS config test
oteffahi Nov 29, 2024
b6b0c0f
Merge branch 'main' into qos/pub-builders-config
oteffahi Nov 29, 2024
2d869ae
Remove unnecessary lifetime
oteffahi Nov 29, 2024
9ca7467
Overwrite PublisherBuilder API calls with config
oteffahi Dec 2, 2024
e37407c
Update QoS config test
oteffahi Dec 2, 2024
2ec8a0f
Remove encoding from QoS config
oteffahi Dec 2, 2024
097d194
Allow API to change non-overwritten QoS config
oteffahi Dec 2, 2024
028c8fa
Rename config qos/put to qos/publication
oteffahi Dec 2, 2024
58ec7bd
Enforce 1-to-1 mapping between config and zenoh enums at compile time
oteffahi Dec 2, 2024
c0ef9d1
Add session API tests
oteffahi Dec 2, 2024
77b9929
Fix case-sensitive parameter
oteffahi Dec 2, 2024
02fcd2e
Mark destination QoS config as unstable
oteffahi Dec 2, 2024
15273a7
Apply QoS overwrites at resolution of PublisherBuilder and Publicatio…
oteffahi Dec 3, 2024
2cab47a
Merge branch 'main' into qos/pub-builders-config
oteffahi Dec 3, 2024
f74fb18
Remove impl lifetimes
oteffahi Dec 3, 2024
1d6af81
Remove unicity check of keyexpr in qos config
oteffahi Dec 3, 2024
a047902
Add copyright header
oteffahi Dec 3, 2024
69e0608
Merge branch 'main' into qos/pub-builders-config
oteffahi Dec 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,22 @@
},
},

// /// Overwrite default builders for specific key expressions
oteffahi marked this conversation as resolved.
Show resolved Hide resolved
// builders: {
// /// A list of publisher builder configurations for specific key expressions.
// publishers: [
// // key_expressions
// key_exprs: ["demo/**", "example/key"],
// // builder configuration
// congestion_control: "Block",
// encoding: "zenoh/example",
// priority: "data_high",
// express: true,
// reliability: "best_effort",
// allowed_destination: "remote",
// ],
// },

// /// The declarations aggregation strategy.
// aggregation: {
// /// A list of key-expressions for which all included subscribers will be aggregated into.
Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ description = "Internal crate for zenoh."

[features]
internal = []
unstable = []

[dependencies]
tracing = { workspace = true }
Expand All @@ -35,6 +36,7 @@ serde_json = { workspace = true }
serde_yaml = { workspace = true }
validated_struct = { workspace = true, features = ["json5", "json_get"] }
zenoh-core = { workspace = true }
zenoh-keyexpr = { workspace = true }
zenoh-protocol = { workspace = true }
zenoh-result = { workspace = true }
zenoh-util = { workspace = true }
Expand Down
127 changes: 127 additions & 0 deletions commons/zenoh-config/src/builders.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use std::collections::HashSet;

use serde::{Deserialize, Deserializer, Serialize};
pub use validated_struct::{GetError, ValidatedMap};
use zenoh_keyexpr::keyexpr_tree::{IKeyExprTreeMut, KeBoxTree};
use zenoh_protocol::core::{key_expr::OwnedKeyExpr, CongestionControl, Reliability};
pub use zenoh_protocol::core::{
whatami, EndPoint, Locator, WhatAmI, WhatAmIMatcher, WhatAmIMatcherVisitor,
};

#[derive(Debug, Deserialize, Default, Serialize, Clone)]
#[serde(remote = "Self")]
pub struct PublisherBuildersConf(pub(crate) Vec<PublisherBuildersInnerConf>);

impl<'de> Deserialize<'de> for PublisherBuildersConf {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let builders = PublisherBuildersConf::deserialize(deserializer)?;
// check for invariant: each key_expr should be unique
oteffahi marked this conversation as resolved.
Show resolved Hide resolved
let mut key_set = HashSet::new();
for builder in &builders.0 {
for key_expr in &builder.key_exprs {
if !key_set.insert(key_expr) {
return Err(format!(
"duplicated key_expr '{key_expr}' found in publisher builders config"
))
.map_err(serde::de::Error::custom);
}
}
}
Ok(builders)
}
}

impl Serialize for PublisherBuildersConf {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
PublisherBuildersConf::serialize(self, serializer)
}
}

impl From<PublisherBuildersConf> for KeBoxTree<PublisherBuilderOptionsConf> {
fn from(value: PublisherBuildersConf) -> KeBoxTree<PublisherBuilderOptionsConf> {
let mut tree = KeBoxTree::new();
for conf in value.0 {
for key_expr in conf.key_exprs {
// key_expr unicity is checked at deserialization
tree.insert(&key_expr, conf.builder_conf.clone());
}
}
tree
}
}

#[derive(Debug, Deserialize, Serialize, Clone)]
pub(crate) struct PublisherBuildersInnerConf {
pub key_exprs: Vec<OwnedKeyExpr>,
#[serde(flatten)]
pub builder_conf: PublisherBuilderOptionsConf,
}

#[derive(Debug, Default, Deserialize, Serialize, Clone)]
pub struct PublisherBuilderOptionsConf {
pub congestion_control: Option<PublisherCongestionControlConf>,
pub encoding: Option<String>, // Encoding has From<&str>
pub priority: Option<PublisherPriorityConf>,
pub express: Option<bool>,
#[cfg(feature = "unstable")]
pub reliability: Option<PublisherReliabilityConf>,
pub allowed_destination: Option<PublisherLocalityConf>,
}

#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
#[serde(rename_all = "lowercase")]
pub enum PublisherCongestionControlConf {
Drop,
Block,
}

impl From<PublisherCongestionControlConf> for CongestionControl {
fn from(value: PublisherCongestionControlConf) -> Self {
match value {
PublisherCongestionControlConf::Drop => CongestionControl::Drop,
PublisherCongestionControlConf::Block => CongestionControl::Block,
}
}
}

#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
#[serde(rename_all = "snake_case")]
pub enum PublisherPriorityConf {
RealTime = 1,
InteractiveHigh = 2,
InteractiveLow = 3,
DataHigh = 4,
Data = 5,
DataLow = 6,
Background = 7,
}

#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
#[serde(rename_all = "snake_case")]
pub enum PublisherReliabilityConf {
BestEffort,
Reliable,
}

impl From<PublisherReliabilityConf> for Reliability {
fn from(value: PublisherReliabilityConf) -> Reliability {
match value {
PublisherReliabilityConf::BestEffort => Reliability::BestEffort,
PublisherReliabilityConf::Reliable => Reliability::Reliable,
}
}
}

#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
#[serde(rename_all = "snake_case")]
pub enum PublisherLocalityConf {
SessionLocal,
Remote,
Any,
}
10 changes: 10 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh)
//!
//! Configuration to pass to `zenoh::open()` and `zenoh::scout()` functions and associated constants.
pub mod builders;
pub mod defaults;
mod include;
pub mod wrappers;
Expand All @@ -29,6 +30,7 @@ use std::{
any::Any, collections::HashSet, fmt, io::Read, net::SocketAddr, ops, path::Path, sync::Weak,
};

use builders::PublisherBuildersConf;
use include::recursive_include;
use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -360,6 +362,14 @@ validated_struct::validator! {
/// A list of key-expressions for which all included publishers will be aggregated into.
publishers: Vec<OwnedKeyExpr>,
},

/// Overwrite default builders for specific key expressions
pub builders: #[derive(Default)]
BuildersConf {
/// A list of publisher builder configurations for specific key expressions.
publishers: PublisherBuildersConf,
},

pub transport: #[derive(Default)]
TransportConf {
pub unicast: TransportUnicastConf {
Expand Down
2 changes: 1 addition & 1 deletion zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ transport_udp = ["zenoh-transport/transport_udp"]
transport_unixsock-stream = ["zenoh-transport/transport_unixsock-stream"]
transport_ws = ["zenoh-transport/transport_ws"]
transport_vsock = ["zenoh-transport/transport_vsock"]
unstable = ["internal_config", "zenoh-keyexpr/unstable"]
unstable = ["internal_config", "zenoh-keyexpr/unstable", "zenoh-config/unstable"]
internal_config = []

[dependencies]
Expand Down
10 changes: 10 additions & 0 deletions zenoh/src/api/builders/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::future::{IntoFuture, Ready};
#[cfg(feature = "shared-memory")]
use std::sync::Arc;

#[cfg(feature = "internal")]
use zenoh_config::builders::PublisherBuildersConf;
use zenoh_core::{Resolvable, Wait};
#[cfg(feature = "internal")]
use zenoh_keyexpr::OwnedKeyExpr;
Expand Down Expand Up @@ -122,6 +124,7 @@ pub fn init(runtime: Runtime) -> InitBuilder {
runtime,
aggregated_subscribers: vec![],
aggregated_publishers: vec![],
publisher_builders: PublisherBuildersConf::default(),
}
}

Expand All @@ -133,6 +136,7 @@ pub struct InitBuilder {
runtime: Runtime,
aggregated_subscribers: Vec<OwnedKeyExpr>,
aggregated_publishers: Vec<OwnedKeyExpr>,
publisher_builders: PublisherBuildersConf,
}

#[zenoh_macros::internal]
Expand All @@ -148,6 +152,11 @@ impl InitBuilder {
self.aggregated_publishers = exprs;
self
}

pub fn publisher_builders(mut self, builder_conf: PublisherBuildersConf) -> Self {
self.publisher_builders = builder_conf;
self
}
}

#[zenoh_macros::internal]
Expand All @@ -162,6 +171,7 @@ impl Wait for InitBuilder {
self.runtime,
self.aggregated_subscribers,
self.aggregated_publishers,
self.publisher_builders,
false,
)
.wait())
Expand Down
15 changes: 15 additions & 0 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{

use futures::Sink;
use tracing::error;
use zenoh_config::builders::PublisherPriorityConf;
use zenoh_core::{Resolvable, Resolve, Wait};
use zenoh_protocol::core::CongestionControl;
use zenoh_result::{Error, ZResult};
Expand Down Expand Up @@ -469,6 +470,20 @@ impl TryFrom<u8> for Priority {
}
}

impl From<PublisherPriorityConf> for Priority {
fn from(value: PublisherPriorityConf) -> Self {
match value {
PublisherPriorityConf::RealTime => Priority::RealTime,
PublisherPriorityConf::InteractiveHigh => Priority::InteractiveHigh,
PublisherPriorityConf::InteractiveLow => Priority::InteractiveLow,
PublisherPriorityConf::DataHigh => Priority::DataHigh,
PublisherPriorityConf::Data => Priority::Data,
PublisherPriorityConf::DataLow => Priority::DataLow,
PublisherPriorityConf::Background => Priority::Background,
}
}
}

type ProtocolPriority = zenoh_protocol::core::Priority;
impl From<Priority> for ProtocolPriority {
fn from(prio: Priority) -> Self {
Expand Down
12 changes: 11 additions & 1 deletion zenoh/src/api/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use std::{convert::TryFrom, fmt};

use serde::{Deserialize, Serialize};
use zenoh_config::wrappers::EntityGlobalId;
use zenoh_config::{builders::PublisherLocalityConf, wrappers::EntityGlobalId};
#[cfg(feature = "unstable")]
use zenoh_protocol::core::Reliability;
use zenoh_protocol::{
Expand Down Expand Up @@ -50,6 +50,16 @@ pub(crate) enum Locality {
Any,
}

impl From<PublisherLocalityConf> for Locality {
fn from(value: PublisherLocalityConf) -> Self {
match value {
PublisherLocalityConf::SessionLocal => Locality::SessionLocal,
PublisherLocalityConf::Remote => Locality::Remote,
PublisherLocalityConf::Any => Locality::Any,
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub(crate) struct DataInfo {
pub kind: SampleKind,
Expand Down
Loading