Skip to content

Commit

Permalink
Adding remaining relevant initial configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
PLeVasseur committed Jun 26, 2024
1 parent 33ec076 commit 43da570
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 120 deletions.
13 changes: 10 additions & 3 deletions up-linux-streamer-plugin/DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
////
//// This file presents the default configuration used by both the `zenoh-plugin-mqtt` plugin and the `zenoh-bridge-mqtt` standalone executable.
//// The "mqtt" JSON5 object below can be used as such in the "plugins" part of a config file for the zenoh router (zenohd).
//// This file presents the default configuration used by both the `up-linux-streamer-plugin` plugin.
//// The "up_linux_streamer" JSON5 object below can be used as such in the "plugins" part of a config file for the zenoh router (zenohd).
////
{
plugins: {
up_linux_streamer: {
__required__: true,
up_streamer_config: {
message_queue_size: 10000
},
host_config: {
transport: "Zenoh",
authority: "linux"
},
someip_config: {
authority: "me_authority"
authority: "me_authority",
config_file: "vsomeip-configs/point_to_point.json",
default_someip_application_id_for_someip_subscriptions: 10,
enabled: true
},
}
}
Expand Down
17 changes: 17 additions & 0 deletions up-linux-streamer-plugin/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,38 @@
use serde::{Deserialize, Serialize};
use std::path::PathBuf;

#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct Config {
pub(crate) __required__: bool,
pub(crate) up_streamer_config: UpStreamerConfig,
pub(crate) host_config: HostConfig,
pub(crate) someip_config: SomeipConfig,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct UpStreamerConfig {
pub(crate) message_queue_size: u16,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct HostConfig {
pub(crate) transport: HostTransport,
pub(crate) authority: String,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct SomeipConfig {
pub(crate) authority: String,
pub(crate) config_file: PathBuf,
pub(crate) default_someip_application_id_for_someip_subscriptions: u16,
pub(crate) enabled: bool,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub enum HostTransport {
Zenoh,
}
192 changes: 75 additions & 117 deletions up-linux-streamer-plugin/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
#![recursion_limit = "256"]

// TODO: Consider if we're ever likely to want to use this _not_ as a Zenoh plugin, in which
// case the config module should be made pub and we should add validation on top of setting
// its members
mod config;

use crate::config::HostTransport;
use async_std::task;
use config::Config;
use futures::select;
Expand Down Expand Up @@ -31,18 +35,14 @@ use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginCont
use zenoh_result::{bail, zerror, ZResult};

// The struct implementing the ZenohPlugin and ZenohPlugin traits
pub struct ExamplePlugin {}
pub struct UpLinuxStreamerPlugin {}

// declaration of the plugin's VTable for zenohd to find the plugin's functions to be called
#[cfg(feature = "dynamic_plugin")]
zenoh_plugin_trait::declare_plugin!(ExamplePlugin);
zenoh_plugin_trait::declare_plugin!(UpLinuxStreamerPlugin);

// A default selector for this example of storage plugin (in case the config doesn't set it)
// This plugin will subscribe to this selector and declare a queryable with this selector
const DEFAULT_SELECTOR: &str = "demo/example/**";

impl ZenohPlugin for ExamplePlugin {}
impl Plugin for ExamplePlugin {
impl ZenohPlugin for UpLinuxStreamerPlugin {}
impl Plugin for UpLinuxStreamerPlugin {
type StartArgs = Runtime;
type Instance = zenoh::plugins::RunningPlugin;

Expand All @@ -56,7 +56,6 @@ impl Plugin for ExamplePlugin {
zenoh_util::try_init_log_from_env();
trace!("up-linux-streamer-plugin: start");

trace!("Attempting to do same style as mqtt plugin for reading config");
let runtime_conf = runtime.config().lock();
let plugin_conf = runtime_conf
.plugin(name)
Expand All @@ -79,7 +78,6 @@ impl Plugin for ExamplePlugin {
name: name.into(),
runtime: runtime.clone(),
}))));

trace!("up-linux-streamer-plugin: after creating RunningPlugin");

Ok(ret)
Expand All @@ -101,48 +99,12 @@ impl PluginControl for RunningPlugin {}
impl RunningPluginTrait for RunningPlugin {
fn config_checker(
&self,
path: &str,
old: &serde_json::Map<String, serde_json::Value>,
new: &serde_json::Map<String, serde_json::Value>,
_path: &str,
_old: &serde_json::Map<String, serde_json::Value>,
_new: &serde_json::Map<String, serde_json::Value>,
) -> ZResult<Option<serde_json::Map<String, serde_json::Value>>> {
let mut guard = zlock!(&self.0);
const STORAGE_SELECTOR: &str = "storage-selector";
if path == STORAGE_SELECTOR || path.is_empty() {
match (old.get(STORAGE_SELECTOR), new.get(STORAGE_SELECTOR)) {
(Some(serde_json::Value::String(os)), Some(serde_json::Value::String(ns)))
if os == ns => {}
(_, Some(serde_json::Value::String(selector))) => {
guard.flag.store(false, Relaxed);
guard.flag = Arc::new(AtomicBool::new(true));
match KeyExpr::try_from(selector.clone()) {
Err(e) => tracing::error!("{}", e),
Ok(selector) => {
// TODO: Figure out the role of this config_checker
// async_std::task::spawn(run(
// guard.runtime.clone(),
// selector,
// guard.flag.clone(),
// ));
}
}
return Ok(None);
}
(_, None) => {
guard.flag.store(false, Relaxed);
}
_ => {
bail!(
"up-linux-streamer-plugin: storage-selector for {} must be a string",
&guard.name
)
}
}
}
bail!(
"up-linux-streamer-plugin: unknown option {} for {}",
path,
guard.name
)
// TODO: Learn more about how the config_checker is used
return Ok(None);
}
}

Expand All @@ -164,84 +126,80 @@ async fn run(runtime: Runtime, config: Config, flag: Arc<AtomicBool>) {

env_logger::init();

let mut streamer = UStreamer::new("up-linux-streamer", 10000);

let exe_path = match env::current_exe() {
Ok(exe_path) => {
if let Some(exe_dir) = exe_path.parent() {
println!("The binary is located in: {}", exe_dir.display());
exe_path
} else {
panic!("Failed to determine the directory of the executable.");
}
}
Err(e) => {
panic!("Failed to get the executable path: {}", e);
}
};
tracing::log::trace!("exe_path: {exe_path:?}");
let exe_path_parent = exe_path.parent();
let Some(exe_path_parent) = exe_path_parent else {
panic!("Unable to get parent path");
};
tracing::log::trace!("exe_path_parent: {exe_path_parent:?}");

// TODO: Make configurable to pass the path to the vsomeip config as a command line argument
let vsomeip_config = PathBuf::from(exe_path_parent).join("vsomeip-configs/point_to_point.json");
tracing::log::trace!("vsomeip_config: {vsomeip_config:?}");
let vsomeip_config = canonicalize(vsomeip_config).ok();
tracing::log::trace!("vsomeip_config: {vsomeip_config:?}");

// There will be a single vsomeip_transport, as there is a connection into device and a streamer
// TODO: Add error handling if we fail to create a UPTransportVsomeip
let vsomeip_transport: Arc<dyn UTransport> = Arc::new(
UPTransportVsomeip::new_with_config(&"linux".to_string(), 10, &vsomeip_config.unwrap())
.unwrap(),
let mut streamer = UStreamer::new(
"up-linux-streamer",
config.up_streamer_config.message_queue_size,
);

// TODO: Probably make somewhat configurable?
// TODO: Add error handling if we fail to create a UPClientZenoh
let zenoh_transport: Arc<dyn UTransport> = Arc::new(
UPClientZenoh::new_with_runtime(runtime.clone(), config.host_config.authority)
.await
.unwrap(),
);
// TODO: Make configurable to pass the name of the mE authority as a command line argument
let vsomeip_endpoint = Endpoint::new(
"vsomeip_endpoint",
&config.someip_config.authority,
vsomeip_transport.clone(),
let host_transport: Arc<dyn UTransport> = Arc::new(match config.host_config.transport {
HostTransport::Zenoh => {
UPClientZenoh::new_with_runtime(runtime.clone(), config.host_config.authority.clone())
.await
.expect("Unable to initialize Zenoh UTransport")
} // other host transports can be added here as they become available
});

let host_endpoint = Endpoint::new(
"host_endpoint",
&config.host_config.authority,
host_transport.clone(),
);

// TODO: Make configurable the ability to have perhaps a config file we pass in that has all the
// relevant authorities over Zenoh that should be forwarded
let zenoh_transport_endpoint_a = Endpoint::new(
"zenoh_transport_endpoint_a",
"linux", // simple initial case of streamer + intended high compute destination on same device
zenoh_transport.clone(),
);

// TODO: Per Zenoh endpoint configured, run these two rules
let forwarding_res = streamer
.add_forwarding_rule(vsomeip_endpoint.clone(), zenoh_transport_endpoint_a.clone())
.await;
if config.someip_config.enabled {
let someip_config_file_abs_path = if config.someip_config.config_file.is_relative() {
env::current_dir()
.unwrap()
.join(&config.someip_config.config_file)
} else {
config.someip_config.config_file
};
tracing::log::trace!("someip_config_file_abs_path: {someip_config_file_abs_path:?}");
if !someip_config_file_abs_path.exists() {
panic!(
"The specified someip config_file doesn't exist: {someip_config_file_abs_path:?}"
);
}

if let Err(err) = forwarding_res {
error!("Unable to add forwarding result: {err:?}");
}
// There will be at most one vsomeip_transport, as there is a connection into device and a streamer
let someip_transport: Arc<dyn UTransport> = Arc::new(
// TODO: Work out changes s.t. the authority given is config.someip_config.authority
// This feels off
UPTransportVsomeip::new_with_config(
&config.host_config.authority,
config
.someip_config
.default_someip_application_id_for_someip_subscriptions,
&someip_config_file_abs_path,
)
.expect("Unable to initialize vsomeip UTransport"),
);

let mechatronics_endpoint = Endpoint::new(
"mechatronics_endpoint",
&config.someip_config.authority,
someip_transport.clone(),
);
let forwarding_res = streamer
.add_forwarding_rule(mechatronics_endpoint.clone(), host_endpoint.clone())
.await;

if let Err(err) = forwarding_res {
panic!("Unable to add forwarding result: {err:?}");
}

let forwarding_res = streamer
.add_forwarding_rule(zenoh_transport_endpoint_a.clone(), vsomeip_endpoint.clone())
.await;
let forwarding_res = streamer
.add_forwarding_rule(host_endpoint.clone(), mechatronics_endpoint.clone())
.await;

if let Err(err) = forwarding_res {
error!("Unable to add forwarding result: {err:?}");
if let Err(err) = forwarding_res {
panic!("Unable to add forwarding result: {err:?}");
}
}

// Plugin's event loop, while the flag is true
let mut counter = 1;
while flag.load(Relaxed) {
// TODO: Need to implement signalling to stop uStreamer
// TODO: Need to implement signaling to stop uStreamer

task::sleep(Duration::from_millis(1000)).await;
trace!("counter: {counter}");
Expand Down

0 comments on commit 43da570

Please sign in to comment.