Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RetainBuffer | basic #38

Merged
merged 12 commits into from
Nov 16, 2023
11 changes: 10 additions & 1 deletion src/conf/tcp_client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct TcpClientConfig {
pub(crate) reconnectCycle: Option<Duration>,
pub(crate) recvQueue: String,
pub(crate) recvQueueMaxLength: i64,
pub(crate) sendQueue: String,
}
///
///
Expand Down Expand Up @@ -71,12 +72,19 @@ impl TcpClientConfig {
Some((keyword, mut selfRecvQueue)) => {
let name = format!("{} {} {}", keyword.prefix(), keyword.kind().to_string(), keyword.name());
debug!("TcpClientConfig.new | self in-queue params {}: {:?}", name, selfRecvQueue);
// let mut params = Self::getParam(&mut selfConf, &mut selfNodeNames, &name).unwrap();
let maxLength = Self::getParam(&mut selfRecvQueue, &mut vec![String::from("max-length")], "max-length").unwrap().as_i64().unwrap();
(keyword.name(), maxLength)
},
None => panic!("TcpClientConfig.new | in queue - not found in : {:?}", selfConf),
};
let selfSendQueue = match Self::getParamByKeyword(&mut selfConf, &mut selfNodeNames, "out", ConfKind::Queue) {
Some((keyword, selfRecvQueue)) => {
let name = format!("{} {} {}", keyword.prefix(), keyword.kind().to_string(), keyword.name());
debug!("TcpClientConfig.new | self out-queue param {}: {:?}", name, selfRecvQueue);
keyword.name()
},
None => panic!("TcpClientConfig.new | in queue - not found in : {:?}", selfConf),
};
debug!("TcpClientConfig.new | selfRecvQueue: {},\tmax-length: {}", selfRecvQueue, selfRecvQueueMaxLength);
TcpClientConfig {
name: selfName,
Expand All @@ -85,6 +93,7 @@ impl TcpClientConfig {
reconnectCycle: selfReconnectCycle,
recvQueue: selfRecvQueue,
recvQueueMaxLength: selfRecvQueueMaxLength,
sendQueue: selfSendQueue,
}
},
None => {
Expand Down
23 changes: 20 additions & 3 deletions src/core_/point/point.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,18 @@ pub struct Point<T> {
pub status: u8,
pub timestamp: DateTime<chrono::Utc>,
}


impl<T> Point<T> {
pub fn new(name: &str, value: T, status: u8, timestamp: DateTime<chrono::Utc>,) -> Point<T> {
Self {
name: name.to_owned(),
value,
status,
timestamp,
}
}
}
///
///
impl Point<Bool> {
///
/// creates Point<Bool> with given name & value, taking current timestamp
Expand All @@ -26,6 +36,8 @@ impl Point<Bool> {
}
}
}
///
///
impl Point<i64> {
///
/// creates Point<i64> with given name & value, taking current timestamp
Expand All @@ -38,6 +50,8 @@ impl Point<i64> {
}
}
}
///
///
impl Point<f64> {
///
/// creates Point<f64> with given name & value, taking current timestamp
Expand All @@ -50,6 +64,8 @@ impl Point<f64> {
}
}
}
///
///
impl Point<String> {
///
/// creates Point<String> with given name & value, taking current timestamp
Expand All @@ -62,7 +78,8 @@ impl Point<String> {
}
}
}

///
///
impl<T: std::ops::Add<Output = T> + Clone> std::ops::Add for Point<T> {
type Output = Point<T>;
fn add(self, rhs: Self) -> Self::Output {
Expand Down
118 changes: 115 additions & 3 deletions src/core_/point/point_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

use std::str::FromStr;

use chrono::DateTime;
use log::trace;
use chrono::{DateTime, Utc};
use log::{trace, warn};
use regex::RegexBuilder;

use crate::core_::types::bool::Bool;
Expand Down Expand Up @@ -50,10 +50,121 @@ pub enum PointType {
Float(Point<f64>),
String(Point<String>)
}
///
///
impl PointType {
///
///
pub fn new<T: ToPoint>(name: &str, value: T) -> Self {
value.toPoint(name)
}
///
///
pub fn fromJsonBytes(bytes: Vec<u8>) -> Result<Self, String> {
match String::from_utf8(bytes) {
Ok(jsonString) => {
match serde_json::from_str(&jsonString) {
Ok(value) => {
let value: serde_json::Value = value;
match value.as_object() {
Some(obj) => {
match obj.get("type") {
Some(type_) => {
match type_.as_str() {
Some("bool") | Some("Bool") => {
let name = obj.get("name").unwrap().as_str().unwrap();
let value = obj.get("value").unwrap().as_bool().unwrap();
let status = obj.get("status").unwrap().as_i64().unwrap();
let timestamp = obj.get("timestamp").unwrap().as_str().unwrap();
let timestamp: DateTime<Utc> = chrono::DateTime::parse_from_rfc3339(timestamp).unwrap().with_timezone(&Utc);
Ok(PointType::Bool(Point::new(
name,
Bool(value),
status as u8,
timestamp,
)))
},
Some("int") | Some("Int") => {
let name = obj.get("name").unwrap().as_str().unwrap();
let value = obj.get("value").unwrap().as_i64().unwrap();
let status = obj.get("status").unwrap().as_i64().unwrap();
let timestamp = obj.get("timestamp").unwrap().as_str().unwrap();
let timestamp: DateTime<Utc> = chrono::DateTime::parse_from_rfc3339(timestamp).unwrap().with_timezone(&Utc);
Ok(PointType::Int(Point::new(
name,
value,
status as u8,
timestamp,
)))
},
Some("float") | Some("Float") => {
let name = obj.get("name").unwrap().as_str().unwrap();
let value = obj.get("value").unwrap().as_f64().unwrap();
let status = obj.get("status").unwrap().as_i64().unwrap();
let timestamp = obj.get("timestamp").unwrap().as_str().unwrap();
let timestamp: DateTime<Utc> = chrono::DateTime::parse_from_rfc3339(timestamp).unwrap().with_timezone(&Utc);
Ok(PointType::Float(Point::new(
name,
value,
status as u8,
timestamp,
)))
},
Some("string") | Some("String") => {
let name = obj.get("name").unwrap().as_str().unwrap();
let value = obj.get("value").unwrap().as_str().unwrap();
let status = obj.get("status").unwrap().as_i64().unwrap();
let timestamp = obj.get("timestamp").unwrap().as_str().unwrap();
let timestamp: DateTime<Utc> = chrono::DateTime::parse_from_rfc3339(timestamp).unwrap().with_timezone(&Utc);
Ok(PointType::String(Point::new(
name,
value.to_owned(),
status as u8,
timestamp,
)))
},
_ => {
let message = format!("PointType.fromBytes | Unknown point type: {}", type_);
warn!("{}", message);
Err(message)
}
}
},
None => {
let message = format!("PointType.fromBytes | JSON convertion error: mapping not found in the JSON: {}", value);
warn!("{}", message);
Err(message)
},
}
},
None => {
let message = format!("PointType.fromBytes | JSON convertion error: mapping not found in the JSON: {}", value);
warn!("{}", message);
Err(message)
},
}
},
Err(err) => {
let message = format!("PointType.fromBytes | JSON convertion error: {:?}", err);
warn!("{}", message);
Err(message)
},
}
// PointType::
},
Err(err) => {
let message = format!("PointType.fromBytes | From bytes error: {:?}", err);
warn!("{}", message);
Err(message)
},
}
}
///
///
pub fn toJsonBytes(&self) -> Result<Vec<u8>, String> {
let mut bytes = vec![];
Ok(bytes)
}
pub fn name(&self) -> String {
match self {
PointType::Bool(point) => point.name.clone(),
Expand Down Expand Up @@ -103,7 +214,8 @@ impl PointType {
}
}
}

///
///
impl FromStr for PointType {
type Err = String;
fn from_str(input: &str) -> Result<PointType, String> {
Expand Down
5 changes: 5 additions & 0 deletions src/core_/retain_buffer/retain_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ impl<T> RetainBuffer<T> {
self.vec.push_back(value);
}
///
/// Returns first value<T> in the buffer
pub fn first(&mut self) -> Option<&T> {
self.vec.front()
}
///
/// Returns and removes first value<T> in the buffer
pub fn popFirst(&mut self) -> Option<T> {
self.vec.pop_front()
Expand Down
11 changes: 5 additions & 6 deletions src/services/api_cient/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{sync::{mpsc::{Receiver, Sender, self}, Arc, atomic::{AtomicBool, Order
use log::{info, debug, trace, warn};

use crate::{
core_::{point::point_type::PointType, net::connection_status::ConnectionStatus},
core_::{point::point_type::PointType, net::connection_status::ConnectionStatus, retain_buffer::retain_buffer::RetainBuffer},
conf::api_client_config::ApiClientConfig,
services::{task::task_cycle::ServiceCycle, api_cient::api_reply::SqlReply, service::Service},
tcp::tcp_socket_client_connect::TcpSocketClientConnect,
Expand Down Expand Up @@ -43,7 +43,7 @@ impl ApiClient {
}
///
/// Reads all avalible at the moment items from the in-queue
fn readQueue(selfId: &str, recv: &Receiver<PointType>, buffer: &mut Vec<PointType>) {
fn readQueue(selfId: &str, recv: &Receiver<PointType>, buffer: &mut RetainBuffer<PointType>) {
let maxReadAtOnce = 1000;
for (index, point) in recv.try_iter().enumerate() {
debug!("{}.readQueue | point: {:?}", selfId, &point);
Expand Down Expand Up @@ -147,9 +147,8 @@ impl ApiClient {
impl Service for ApiClient {
///
/// returns sender of the ApiClient queue by name
fn getLink(&self, name: impl Into<String>) -> Sender<PointType> {
let name = name.into();
match self.send.get(&name) {
fn getLink(&self, name: &str) -> Sender<PointType> {
match self.send.get(name) {
Some(send) => send.clone(),
None => panic!("{}.run | link '{:?}' - not found", self.id, name),
}
Expand All @@ -170,7 +169,7 @@ impl Service for ApiClient {
let _queueMaxLength = conf.recvQueueMaxLength;
let _h = thread::Builder::new().name(format!("{} - main", selfId)).spawn(move || {
let mut isConnected = false;
let mut buffer = Vec::new();
let mut buffer = RetainBuffer::new(&selfId, "", Some(conf.recvQueueMaxLength as usize));
let mut cycle = ServiceCycle::new(cycleInterval);
let mut connect = TcpSocketClientConnect::new(selfId.clone() + "/TcpSocketClientConnect", conf.address);
let mut stream = None;
Expand Down
3 changes: 2 additions & 1 deletion src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ pub mod tcp_client;
pub mod cma_client;
pub mod queues;
pub mod task;
pub mod service;
pub mod service;
pub mod services;
2 changes: 1 addition & 1 deletion src/services/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::mpsc::Sender;
use crate::core_::point::point_type::PointType;

pub trait Service {
fn getLink(&self, name: impl Into<String>) -> Sender<PointType>;
fn getLink(&self, name: &str) -> Sender<PointType>;
fn run(&mut self);
fn exit(&self);
}
32 changes: 32 additions & 0 deletions src/services/services.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#![allow(non_snake_case)]

use std::collections::HashMap;

use super::service::Service;

///
/// Holds a map of the all services in app by there names
pub struct Services {
id: String,
map: HashMap<String, Box<dyn Service>>,
}
///
///
impl Services {
///
/// Creates new instance of the ReatinBuffer
pub fn new(parent: impl Into<String>) -> Self {
Self {
id: format!("{}/RetainBuffer({})", parent.into(), "Services"),
map: HashMap::new(),
}
}
///
///
pub fn get(&self, name: &str) -> &Box<dyn Service> {
match self.map.get(name) {
Some(srvc) => srvc,
None => panic!("{}.get | service '{:?}' - not found", self.id, name),
}
}
}
Loading
Loading