From 43da5707a72b8d8c58bd73bfeb07ee2e6b2b3270 Mon Sep 17 00:00:00 2001 From: Peter LeVasseur Date: Wed, 26 Jun 2024 15:12:27 -0400 Subject: [PATCH] Adding remaining relevant initial configurations --- up-linux-streamer-plugin/DEFAULT_CONFIG.json5 | 13 +- up-linux-streamer-plugin/src/config.rs | 17 ++ up-linux-streamer-plugin/src/lib.rs | 192 +++++++----------- 3 files changed, 102 insertions(+), 120 deletions(-) diff --git a/up-linux-streamer-plugin/DEFAULT_CONFIG.json5 b/up-linux-streamer-plugin/DEFAULT_CONFIG.json5 index 62f7262c..14575db6 100644 --- a/up-linux-streamer-plugin/DEFAULT_CONFIG.json5 +++ b/up-linux-streamer-plugin/DEFAULT_CONFIG.json5 @@ -1,16 +1,23 @@ //// -//// This file presents the default configuration used by both the `zenoh-plugin-mqtt` plugin and the `zenoh-bridge-mqtt` standalone executable. -//// The "mqtt" JSON5 object below can be used as such in the "plugins" part of a config file for the zenoh router (zenohd). +//// This file presents the default configuration used by both the `up-linux-streamer-plugin` plugin. +//// The "up_linux_streamer" JSON5 object below can be used as such in the "plugins" part of a config file for the zenoh router (zenohd). //// { plugins: { up_linux_streamer: { __required__: true, + up_streamer_config: { + message_queue_size: 10000 + }, host_config: { + transport: "Zenoh", authority: "linux" }, someip_config: { - authority: "me_authority" + authority: "me_authority", + config_file: "vsomeip-configs/point_to_point.json", + default_someip_application_id_for_someip_subscriptions: 10, + enabled: true }, } } diff --git a/up-linux-streamer-plugin/src/config.rs b/up-linux-streamer-plugin/src/config.rs index 7b84094a..bf7f82c1 100644 --- a/up-linux-streamer-plugin/src/config.rs +++ b/up-linux-streamer-plugin/src/config.rs @@ -1,16 +1,25 @@ use serde::{Deserialize, Serialize}; +use std::path::PathBuf; #[derive(Deserialize, Serialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct Config { pub(crate) __required__: bool, + pub(crate) up_streamer_config: UpStreamerConfig, pub(crate) host_config: HostConfig, pub(crate) someip_config: SomeipConfig, } +#[derive(Deserialize, Serialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct UpStreamerConfig { + pub(crate) message_queue_size: u16, +} + #[derive(Deserialize, Serialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct HostConfig { + pub(crate) transport: HostTransport, pub(crate) authority: String, } @@ -18,4 +27,12 @@ pub struct HostConfig { #[serde(deny_unknown_fields)] pub struct SomeipConfig { pub(crate) authority: String, + pub(crate) config_file: PathBuf, + pub(crate) default_someip_application_id_for_someip_subscriptions: u16, + pub(crate) enabled: bool, +} + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub enum HostTransport { + Zenoh, } diff --git a/up-linux-streamer-plugin/src/lib.rs b/up-linux-streamer-plugin/src/lib.rs index 8d50198b..4d93ef62 100644 --- a/up-linux-streamer-plugin/src/lib.rs +++ b/up-linux-streamer-plugin/src/lib.rs @@ -1,7 +1,11 @@ #![recursion_limit = "256"] +// TODO: Consider if we're ever likely to want to use this _not_ as a Zenoh plugin, in which +// case the config module should be made pub and we should add validation on top of setting +// its members mod config; +use crate::config::HostTransport; use async_std::task; use config::Config; use futures::select; @@ -31,18 +35,14 @@ use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginCont use zenoh_result::{bail, zerror, ZResult}; // The struct implementing the ZenohPlugin and ZenohPlugin traits -pub struct ExamplePlugin {} +pub struct UpLinuxStreamerPlugin {} // declaration of the plugin's VTable for zenohd to find the plugin's functions to be called #[cfg(feature = "dynamic_plugin")] -zenoh_plugin_trait::declare_plugin!(ExamplePlugin); +zenoh_plugin_trait::declare_plugin!(UpLinuxStreamerPlugin); -// A default selector for this example of storage plugin (in case the config doesn't set it) -// This plugin will subscribe to this selector and declare a queryable with this selector -const DEFAULT_SELECTOR: &str = "demo/example/**"; - -impl ZenohPlugin for ExamplePlugin {} -impl Plugin for ExamplePlugin { +impl ZenohPlugin for UpLinuxStreamerPlugin {} +impl Plugin for UpLinuxStreamerPlugin { type StartArgs = Runtime; type Instance = zenoh::plugins::RunningPlugin; @@ -56,7 +56,6 @@ impl Plugin for ExamplePlugin { zenoh_util::try_init_log_from_env(); trace!("up-linux-streamer-plugin: start"); - trace!("Attempting to do same style as mqtt plugin for reading config"); let runtime_conf = runtime.config().lock(); let plugin_conf = runtime_conf .plugin(name) @@ -79,7 +78,6 @@ impl Plugin for ExamplePlugin { name: name.into(), runtime: runtime.clone(), })))); - trace!("up-linux-streamer-plugin: after creating RunningPlugin"); Ok(ret) @@ -101,48 +99,12 @@ impl PluginControl for RunningPlugin {} impl RunningPluginTrait for RunningPlugin { fn config_checker( &self, - path: &str, - old: &serde_json::Map, - new: &serde_json::Map, + _path: &str, + _old: &serde_json::Map, + _new: &serde_json::Map, ) -> ZResult>> { - let mut guard = zlock!(&self.0); - const STORAGE_SELECTOR: &str = "storage-selector"; - if path == STORAGE_SELECTOR || path.is_empty() { - match (old.get(STORAGE_SELECTOR), new.get(STORAGE_SELECTOR)) { - (Some(serde_json::Value::String(os)), Some(serde_json::Value::String(ns))) - if os == ns => {} - (_, Some(serde_json::Value::String(selector))) => { - guard.flag.store(false, Relaxed); - guard.flag = Arc::new(AtomicBool::new(true)); - match KeyExpr::try_from(selector.clone()) { - Err(e) => tracing::error!("{}", e), - Ok(selector) => { - // TODO: Figure out the role of this config_checker - // async_std::task::spawn(run( - // guard.runtime.clone(), - // selector, - // guard.flag.clone(), - // )); - } - } - return Ok(None); - } - (_, None) => { - guard.flag.store(false, Relaxed); - } - _ => { - bail!( - "up-linux-streamer-plugin: storage-selector for {} must be a string", - &guard.name - ) - } - } - } - bail!( - "up-linux-streamer-plugin: unknown option {} for {}", - path, - guard.name - ) + // TODO: Learn more about how the config_checker is used + return Ok(None); } } @@ -164,84 +126,80 @@ async fn run(runtime: Runtime, config: Config, flag: Arc) { env_logger::init(); - let mut streamer = UStreamer::new("up-linux-streamer", 10000); - - let exe_path = match env::current_exe() { - Ok(exe_path) => { - if let Some(exe_dir) = exe_path.parent() { - println!("The binary is located in: {}", exe_dir.display()); - exe_path - } else { - panic!("Failed to determine the directory of the executable."); - } - } - Err(e) => { - panic!("Failed to get the executable path: {}", e); - } - }; - tracing::log::trace!("exe_path: {exe_path:?}"); - let exe_path_parent = exe_path.parent(); - let Some(exe_path_parent) = exe_path_parent else { - panic!("Unable to get parent path"); - }; - tracing::log::trace!("exe_path_parent: {exe_path_parent:?}"); - - // TODO: Make configurable to pass the path to the vsomeip config as a command line argument - let vsomeip_config = PathBuf::from(exe_path_parent).join("vsomeip-configs/point_to_point.json"); - tracing::log::trace!("vsomeip_config: {vsomeip_config:?}"); - let vsomeip_config = canonicalize(vsomeip_config).ok(); - tracing::log::trace!("vsomeip_config: {vsomeip_config:?}"); - - // There will be a single vsomeip_transport, as there is a connection into device and a streamer - // TODO: Add error handling if we fail to create a UPTransportVsomeip - let vsomeip_transport: Arc = Arc::new( - UPTransportVsomeip::new_with_config(&"linux".to_string(), 10, &vsomeip_config.unwrap()) - .unwrap(), + let mut streamer = UStreamer::new( + "up-linux-streamer", + config.up_streamer_config.message_queue_size, ); - // TODO: Probably make somewhat configurable? - // TODO: Add error handling if we fail to create a UPClientZenoh - let zenoh_transport: Arc = Arc::new( - UPClientZenoh::new_with_runtime(runtime.clone(), config.host_config.authority) - .await - .unwrap(), - ); - // TODO: Make configurable to pass the name of the mE authority as a command line argument - let vsomeip_endpoint = Endpoint::new( - "vsomeip_endpoint", - &config.someip_config.authority, - vsomeip_transport.clone(), + let host_transport: Arc = Arc::new(match config.host_config.transport { + HostTransport::Zenoh => { + UPClientZenoh::new_with_runtime(runtime.clone(), config.host_config.authority.clone()) + .await + .expect("Unable to initialize Zenoh UTransport") + } // other host transports can be added here as they become available + }); + + let host_endpoint = Endpoint::new( + "host_endpoint", + &config.host_config.authority, + host_transport.clone(), ); - // TODO: Make configurable the ability to have perhaps a config file we pass in that has all the - // relevant authorities over Zenoh that should be forwarded - let zenoh_transport_endpoint_a = Endpoint::new( - "zenoh_transport_endpoint_a", - "linux", // simple initial case of streamer + intended high compute destination on same device - zenoh_transport.clone(), - ); - - // TODO: Per Zenoh endpoint configured, run these two rules - let forwarding_res = streamer - .add_forwarding_rule(vsomeip_endpoint.clone(), zenoh_transport_endpoint_a.clone()) - .await; + if config.someip_config.enabled { + let someip_config_file_abs_path = if config.someip_config.config_file.is_relative() { + env::current_dir() + .unwrap() + .join(&config.someip_config.config_file) + } else { + config.someip_config.config_file + }; + tracing::log::trace!("someip_config_file_abs_path: {someip_config_file_abs_path:?}"); + if !someip_config_file_abs_path.exists() { + panic!( + "The specified someip config_file doesn't exist: {someip_config_file_abs_path:?}" + ); + } - if let Err(err) = forwarding_res { - error!("Unable to add forwarding result: {err:?}"); - } + // There will be at most one vsomeip_transport, as there is a connection into device and a streamer + let someip_transport: Arc = Arc::new( + // TODO: Work out changes s.t. the authority given is config.someip_config.authority + // This feels off + UPTransportVsomeip::new_with_config( + &config.host_config.authority, + config + .someip_config + .default_someip_application_id_for_someip_subscriptions, + &someip_config_file_abs_path, + ) + .expect("Unable to initialize vsomeip UTransport"), + ); + + let mechatronics_endpoint = Endpoint::new( + "mechatronics_endpoint", + &config.someip_config.authority, + someip_transport.clone(), + ); + let forwarding_res = streamer + .add_forwarding_rule(mechatronics_endpoint.clone(), host_endpoint.clone()) + .await; + + if let Err(err) = forwarding_res { + panic!("Unable to add forwarding result: {err:?}"); + } - let forwarding_res = streamer - .add_forwarding_rule(zenoh_transport_endpoint_a.clone(), vsomeip_endpoint.clone()) - .await; + let forwarding_res = streamer + .add_forwarding_rule(host_endpoint.clone(), mechatronics_endpoint.clone()) + .await; - if let Err(err) = forwarding_res { - error!("Unable to add forwarding result: {err:?}"); + if let Err(err) = forwarding_res { + panic!("Unable to add forwarding result: {err:?}"); + } } // Plugin's event loop, while the flag is true let mut counter = 1; while flag.load(Relaxed) { - // TODO: Need to implement signalling to stop uStreamer + // TODO: Need to implement signaling to stop uStreamer task::sleep(Duration::from_millis(1000)).await; trace!("counter: {counter}");