From 5a3ac24e37cd505da81a9caa9a8031c04c94666a Mon Sep 17 00:00:00 2001 From: a-givertzman Date: Mon, 23 Sep 2024 16:18:12 +0300 Subject: [PATCH] Task | FnVaFft | Threshold filtering --- .../task/nested_function/va/fn_va_fft.rs | 104 ++++++++++++++---- 1 file changed, 80 insertions(+), 24 deletions(-) diff --git a/src/services/task/nested_function/va/fn_va_fft.rs b/src/services/task/nested_function/va/fn_va_fft.rs index 10b4c7338..cc521a833 100644 --- a/src/services/task/nested_function/va/fn_va_fft.rs +++ b/src/services/task/nested_function/va/fn_va_fft.rs @@ -3,11 +3,11 @@ use concat_in_place::strcat; use derivative::Derivative; use rustfft::{num_complex::ComplexFloat, Fft, FftPlanner}; use sal_sync::services::{ - entity::{cot::Cot, name::Name, point::{point::Point, point_config::PointConfig, point_config_type::PointConfigType, point_hlr::PointHlr, point_tx_id::PointTxId}, status::status::Status}, service::link_name::LinkName, types::bool::Bool + entity::{cot::Cot, name::Name, point::{point::Point, point_config::PointConfig, point_config_filters::PointConfigFilter, point_config_type::PointConfigType, point_hlr::PointHlr, point_tx_id::PointTxId}, status::status::Status}, service::link_name::LinkName, types::bool::Bool }; use std::{str::FromStr, sync::{atomic::{AtomicUsize, Ordering}, mpsc::Sender, Arc, RwLock}}; use crate::{ - conf::fn_::{fn_conf_kind::FnConfKind, fn_config::FnConfig}, core_::types::fn_in_out_ref::FnInOutRef, services::{safe_lock::rwlock::SafeLock, services::Services, task::nested_function::{ + conf::fn_::{fn_conf_kind::FnConfKind, fn_config::FnConfig}, core_::{filter::{filter::{Filter, FilterEmpty}, filter_threshold::FilterThreshold}, types::fn_in_out_ref::FnInOutRef}, services::{safe_lock::rwlock::SafeLock, services::Services, task::nested_function::{ fn_::{FnIn, FnInOut, FnOut}, fn_kind::FnKind, fn_result::FnResult, }} @@ -31,7 +31,7 @@ static COUNT: AtomicUsize = AtomicUsize::new(1); /// fn VaFft: /// enable: const bool true # optional, default true /// send-to: /AppTest/MultiQueue.in-queue -/// conf point Fft: # full name will be: /App/Task/Fft.freq +/// conf point Fft: # Conf for Point's to be exported full name will be: /App/Task/Fft.freq /// type: 'Real' # Double / Real / Int /// input: point string /AppTest/Exit /// freq: 300000 # Sampling freq @@ -48,6 +48,7 @@ pub struct FnVaFft { id: String, kind: FnKind, enable: Option, + /// Point config for exported Point's point_conf: PointConfig, fft_size: usize, input: FnInOutRef, @@ -60,6 +61,8 @@ pub struct FnVaFft { #[derivative(Debug="ignore")] fft_buf: FftBuf, sampl_freq: usize, + threshold: Option, + filters: Vec + Send>>, tx_send: Option>, } // @@ -81,7 +84,47 @@ impl FnVaFft { Err(_) => panic!("{}.new | Parameter 'freq' - missed", self_id), }; log::debug!("{}.new | sampl_freq: {:?}", self_id, sampl_freq); - let point_conf = match conf.clone().input_conf("conf") { + let point_conf = Self::export_point_conf(parent, &self_id, &conf); + log::debug!("{}.new | point_conf: {:#?}", self_id, point_conf); + let threshold = Self::threshold_conf(&self_id, &conf); + log::debug!("{}.new | threshold: {:#?}", self_id, threshold); + let send_to = Self::send_to_conf(&self_id, &conf, &services); + log::debug!("{}.new | send_to: {:#?}", self_id, threshold); + let fft_buf = FftBuf::new(fft_size, sampl_freq); + Self { + tx_id: PointTxId::from_str(&self_id), + id: self_id, + kind: FnKind::Fn, + enable, + point_conf, + fft_size, + input, + fft: FftPlanner::new().plan_fft_forward(fft_size), + fft_freqs: (0..fft_size / 2).map(|i| format!("{:?}", fft_buf.freq_of(i)) ).collect(), + amp_factor: fft_buf.amp_factor(), + fft_buf, + sampl_freq, + threshold, + filters: vec![], + tx_send: send_to, + } + } + /// + /// + fn filter(conf: Option) -> Box + Send> { + match conf { + Some(conf) => { + Box::new( + FilterThreshold::new(0.0f64, conf.threshold, conf.factor.unwrap_or(0.0)) + ) + } + None => Box::new(FilterEmpty::::new(0.0)), + } + } + /// + /// Returns Export point_conf + fn export_point_conf(parent: impl Into, self_id: &str, conf: &FnConfig) -> PointConfig { + match conf.clone().input_conf("conf") { Ok(conf) => match conf { FnConfKind::PointConf(conf) => match conf.conf.type_ { PointConfigType::Int | PointConfigType::Real | PointConfigType::Double => conf.conf.clone(), @@ -93,38 +136,51 @@ impl FnVaFft { conf point Fft: type: 'Real' "#).unwrap()), - }; - log::debug!("{}.new | point_conf: {:#?}", self_id, point_conf); - let send_to = match conf.param("send-to") { + } + } + /// + /// Returns Threshold config + fn threshold_conf(self_id: &str, conf: &FnConfig) -> Option { + match conf.param("threshold") { + Ok(threshold) => match threshold { + FnConfKind::Param(threshold) => match serde_yaml::from_value(threshold.conf.clone()) { + Ok(threshold) => { + let threshold: PointConfigFilter = threshold; + Some(threshold) + } + Err(err) => { + log::warn!("{}.new | Invalid Threshold filter config in: {:?}, \n\t error: {:#?}", self_id, conf, err); + None + } + } + _ => { + log::warn!("{}.new | Invalid Threshold filter config in: {:?}", self_id, conf); + None + } + } + Err(_) => { + log::warn!("{}.new | Threshold filter config missed in: {:?}", self_id, conf); + None + }, + } + } + /// + /// Returns send_to + fn send_to_conf(self_id: &str, conf: &FnConfig, services: &Arc>) -> Option> { + match conf.param("send-to") { Ok(send_to) => { let send_to = match send_to { FnConfKind::Param(send_to) => LinkName::from_str(send_to.conf.as_str().unwrap()).unwrap(), _ => panic!("{}.new | Parameter 'send-to' - invalid type (string expected): {:#?}", self_id, send_to), }; log::debug!("{}.new | send-to: {:?}", self_id, send_to); - let services_lock = services.rlock(&self_id); + let services_lock = services.rlock(self_id); services_lock.get_link(&send_to).map_or(None, |send| Some(send)) } Err(_) => { log::warn!("{}.new | Parameter 'send-to' - missed in {:#?}", self_id, conf); None }, - }; - let fft_buf = FftBuf::new(fft_size, sampl_freq); - Self { - tx_id: PointTxId::from_str(&self_id), - id: self_id, - kind: FnKind::Fn, - enable, - point_conf, - fft_size, - input, - fft: FftPlanner::new().plan_fft_forward(fft_size), - fft_freqs: (0..fft_size / 2).map(|i| format!("{:?}", fft_buf.freq_of(i)) ).collect(), //Self::frequencies(sampl_freq, fft_size), //.iter().map(|f| f.to_string()).collect(), - amp_factor: fft_buf.amp_factor(), - fft_buf, - sampl_freq, - tx_send: send_to, } } ///