Skip to content

Commit

Permalink
Task | FnVaFft | Threshold filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
a-givertzman committed Sep 23, 2024
1 parent d7acdd1 commit 5a3ac24
Showing 1 changed file with 80 additions and 24 deletions.
104 changes: 80 additions & 24 deletions src/services/task/nested_function/va/fn_va_fft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use concat_in_place::strcat;
use derivative::Derivative;
use rustfft::{num_complex::ComplexFloat, Fft, FftPlanner};
use sal_sync::services::{
entity::{cot::Cot, name::Name, point::{point::Point, point_config::PointConfig, point_config_type::PointConfigType, point_hlr::PointHlr, point_tx_id::PointTxId}, status::status::Status}, service::link_name::LinkName, types::bool::Bool
entity::{cot::Cot, name::Name, point::{point::Point, point_config::PointConfig, point_config_filters::PointConfigFilter, point_config_type::PointConfigType, point_hlr::PointHlr, point_tx_id::PointTxId}, status::status::Status}, service::link_name::LinkName, types::bool::Bool
};
use std::{str::FromStr, sync::{atomic::{AtomicUsize, Ordering}, mpsc::Sender, Arc, RwLock}};
use crate::{
conf::fn_::{fn_conf_kind::FnConfKind, fn_config::FnConfig}, core_::types::fn_in_out_ref::FnInOutRef, services::{safe_lock::rwlock::SafeLock, services::Services, task::nested_function::{
conf::fn_::{fn_conf_kind::FnConfKind, fn_config::FnConfig}, core_::{filter::{filter::{Filter, FilterEmpty}, filter_threshold::FilterThreshold}, types::fn_in_out_ref::FnInOutRef}, services::{safe_lock::rwlock::SafeLock, services::Services, task::nested_function::{
fn_::{FnIn, FnInOut, FnOut},
fn_kind::FnKind, fn_result::FnResult,
}}
Expand All @@ -31,7 +31,7 @@ static COUNT: AtomicUsize = AtomicUsize::new(1);
/// fn VaFft:
/// enable: const bool true # optional, default true
/// send-to: /AppTest/MultiQueue.in-queue
/// conf point Fft: # full name will be: /App/Task/Fft.freq
/// conf point Fft: # Conf for Point's to be exported full name will be: /App/Task/Fft.freq
/// type: 'Real' # Double / Real / Int
/// input: point string /AppTest/Exit
/// freq: 300000 # Sampling freq
Expand All @@ -48,6 +48,7 @@ pub struct FnVaFft {
id: String,
kind: FnKind,
enable: Option<FnInOutRef>,
/// Point config for exported Point's
point_conf: PointConfig,
fft_size: usize,
input: FnInOutRef,
Expand All @@ -60,6 +61,8 @@ pub struct FnVaFft {
#[derivative(Debug="ignore")]
fft_buf: FftBuf,
sampl_freq: usize,
threshold: Option<PointConfigFilter>,
filters: Vec<Box<dyn Filter<Item = Point> + Send>>,
tx_send: Option<Sender<Point>>,
}
//
Expand All @@ -81,7 +84,47 @@ impl FnVaFft {
Err(_) => panic!("{}.new | Parameter 'freq' - missed", self_id),
};
log::debug!("{}.new | sampl_freq: {:?}", self_id, sampl_freq);
let point_conf = match conf.clone().input_conf("conf") {
let point_conf = Self::export_point_conf(parent, &self_id, &conf);
log::debug!("{}.new | point_conf: {:#?}", self_id, point_conf);
let threshold = Self::threshold_conf(&self_id, &conf);
log::debug!("{}.new | threshold: {:#?}", self_id, threshold);
let send_to = Self::send_to_conf(&self_id, &conf, &services);
log::debug!("{}.new | send_to: {:#?}", self_id, threshold);
let fft_buf = FftBuf::new(fft_size, sampl_freq);
Self {
tx_id: PointTxId::from_str(&self_id),
id: self_id,
kind: FnKind::Fn,
enable,
point_conf,
fft_size,
input,
fft: FftPlanner::new().plan_fft_forward(fft_size),
fft_freqs: (0..fft_size / 2).map(|i| format!("{:?}", fft_buf.freq_of(i)) ).collect(),
amp_factor: fft_buf.amp_factor(),
fft_buf,
sampl_freq,
threshold,
filters: vec![],
tx_send: send_to,
}
}
///
///
fn filter(conf: Option<PointConfigFilter>) -> Box<dyn Filter<Item = f64> + Send> {
match conf {
Some(conf) => {
Box::new(
FilterThreshold::new(0.0f64, conf.threshold, conf.factor.unwrap_or(0.0))
)
}
None => Box::new(FilterEmpty::<f64>::new(0.0)),
}
}
///
/// Returns Export point_conf
fn export_point_conf(parent: impl Into<String>, self_id: &str, conf: &FnConfig) -> PointConfig {
match conf.clone().input_conf("conf") {
Ok(conf) => match conf {
FnConfKind::PointConf(conf) => match conf.conf.type_ {
PointConfigType::Int | PointConfigType::Real | PointConfigType::Double => conf.conf.clone(),
Expand All @@ -93,38 +136,51 @@ impl FnVaFft {
conf point Fft:
type: 'Real'
"#).unwrap()),
};
log::debug!("{}.new | point_conf: {:#?}", self_id, point_conf);
let send_to = match conf.param("send-to") {
}
}
///
/// Returns Threshold config
fn threshold_conf(self_id: &str, conf: &FnConfig) -> Option<PointConfigFilter> {
match conf.param("threshold") {
Ok(threshold) => match threshold {
FnConfKind::Param(threshold) => match serde_yaml::from_value(threshold.conf.clone()) {
Ok(threshold) => {
let threshold: PointConfigFilter = threshold;
Some(threshold)
}
Err(err) => {
log::warn!("{}.new | Invalid Threshold filter config in: {:?}, \n\t error: {:#?}", self_id, conf, err);
None
}
}
_ => {
log::warn!("{}.new | Invalid Threshold filter config in: {:?}", self_id, conf);
None
}
}
Err(_) => {
log::warn!("{}.new | Threshold filter config missed in: {:?}", self_id, conf);
None
},
}
}
///
/// Returns send_to
fn send_to_conf(self_id: &str, conf: &FnConfig, services: &Arc<RwLock<Services>>) -> Option<Sender<Point>> {
match conf.param("send-to") {
Ok(send_to) => {
let send_to = match send_to {
FnConfKind::Param(send_to) => LinkName::from_str(send_to.conf.as_str().unwrap()).unwrap(),
_ => panic!("{}.new | Parameter 'send-to' - invalid type (string expected): {:#?}", self_id, send_to),
};
log::debug!("{}.new | send-to: {:?}", self_id, send_to);
let services_lock = services.rlock(&self_id);
let services_lock = services.rlock(self_id);
services_lock.get_link(&send_to).map_or(None, |send| Some(send))
}
Err(_) => {
log::warn!("{}.new | Parameter 'send-to' - missed in {:#?}", self_id, conf);
None
},
};
let fft_buf = FftBuf::new(fft_size, sampl_freq);
Self {
tx_id: PointTxId::from_str(&self_id),
id: self_id,
kind: FnKind::Fn,
enable,
point_conf,
fft_size,
input,
fft: FftPlanner::new().plan_fft_forward(fft_size),
fft_freqs: (0..fft_size / 2).map(|i| format!("{:?}", fft_buf.freq_of(i)) ).collect(), //Self::frequencies(sampl_freq, fft_size), //.iter().map(|f| f.to_string()).collect(),
amp_factor: fft_buf.amp_factor(),
fft_buf,
sampl_freq,
tx_send: send_to,
}
}
///
Expand Down

0 comments on commit 5a3ac24

Please sign in to comment.