diff --git a/Cargo.toml b/Cargo.toml index 98c5abd..81cc82b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,4 +6,4 @@ members=[ ] [patch.crates-io] -opcua = { git = 'https://github.com/locka99/opcua.git' } \ No newline at end of file +opcua = { git = 'https://github.com/locka99/opcua.git', commit="a5f452a67a0efbb1faaacda023612eea6d63be22"} \ No newline at end of file diff --git a/chrontext/src/timeseries_database.rs b/chrontext/src/timeseries_database.rs index fcb9a96..58a1d5a 100644 --- a/chrontext/src/timeseries_database.rs +++ b/chrontext/src/timeseries_database.rs @@ -23,25 +23,26 @@ pub trait TimeSeriesQueryable: Send { #[derive(Clone)] pub enum DatabaseType { BigQuery, - Dremio + Dremio, } pub trait TimeSeriesSQLQueryable { fn get_sql_string( &self, tsq: &TimeSeriesQuery, - database_type:DatabaseType, + database_type: DatabaseType, ) -> Result { - - let query_string; { - let transformer = TimeSeriesQueryToSQLTransformer::new(&self.get_time_series_tables(), database_type.clone()); + let transformer = TimeSeriesQueryToSQLTransformer::new( + &self.get_time_series_tables(), + database_type.clone(), + ); let (query, _) = transformer.create_query(tsq, false)?; query_string = match database_type { - DatabaseType::BigQuery => {query.to_string(BigQueryQueryBuilder)} - DatabaseType::Dremio => {query.to_string(PostgresQueryBuilder)} - }; + DatabaseType::BigQuery => query.to_string(BigQueryQueryBuilder), + DatabaseType::Dremio => query.to_string(PostgresQueryBuilder), + }; debug!("SQL: {}", query_string); } diff --git a/chrontext/src/timeseries_database/bigquery_database.rs b/chrontext/src/timeseries_database/bigquery_database.rs index 2ace418..e21fed8 100644 --- a/chrontext/src/timeseries_database/bigquery_database.rs +++ b/chrontext/src/timeseries_database/bigquery_database.rs @@ -1,4 +1,3 @@ - use crate::timeseries_database::timeseries_sql_rewrite::{ TimeSeriesQueryToSQLError, TimeSeriesTable, }; @@ -80,13 +79,14 @@ impl TimeSeriesQueryable for BigQueryDatabase { dispatcher.run().unwrap(); let (chunks, schema) = destination.arrow().unwrap(); return (chunks, schema); - }).await?; + }) + .await?; let mut series_vec = vec![]; let mut array_ref_vecs = vec![]; for ch in chunks.into_iter() { - for (i,arr) in ch.into_arrays().into_iter().enumerate() { - if array_ref_vecs.len() < i+1 { + for (i, arr) in ch.into_arrays().into_iter().enumerate() { + if array_ref_vecs.len() < i + 1 { array_ref_vecs.push(vec![]); } array_ref_vecs.get_mut(i).unwrap().push(arr) diff --git a/chrontext/src/timeseries_database/opcua_history_read.rs b/chrontext/src/timeseries_database/opcua_history_read.rs index 10059fa..7cd84ec 100644 --- a/chrontext/src/timeseries_database/opcua_history_read.rs +++ b/chrontext/src/timeseries_database/opcua_history_read.rs @@ -10,6 +10,7 @@ use opcua::client::prelude::{ ReadProcessedDetails, ReadRawModifiedDetails, Session, TimestampsToReturn, UAString, UserTokenPolicy, Variant, }; +use opcua::sync::RwLock; use oxrdf::vocab::xsd; use oxrdf::{Literal, Variable}; use polars::export::chrono::{DateTime as ChronoDateTime, Duration, NaiveDateTime, TimeZone, Utc}; @@ -22,8 +23,7 @@ use std::collections::HashMap; use std::error::Error; use std::fmt::{Display, Formatter}; use std::str::FromStr; -use std::sync::{Arc}; -use opcua::sync::RwLock; +use std::sync::Arc; const OPCUA_AGG_FUNC_AVERAGE: u32 = 2342; const OPCUA_AGG_FUNC_COUNT: u32 = 2352; diff --git a/chrontext/src/timeseries_database/timeseries_sql_rewrite.rs b/chrontext/src/timeseries_database/timeseries_sql_rewrite.rs index 9456f96..91e7c34 100644 --- a/chrontext/src/timeseries_database/timeseries_sql_rewrite.rs +++ b/chrontext/src/timeseries_database/timeseries_sql_rewrite.rs @@ -11,8 +11,7 @@ use polars_core::frame::DataFrame; use sea_query::extension::bigquery::{NamedField, Unnest}; use sea_query::IntoIden; use sea_query::{ - Alias, BinOper, ColumnRef, JoinType, Order, Query, SelectStatement, SimpleExpr, - TableRef, + Alias, BinOper, ColumnRef, JoinType, Order, Query, SelectStatement, SimpleExpr, TableRef, }; use sea_query::{Expr as SeaExpr, Iden, Value}; use spargebra::algebra::{AggregateExpression, Expression}; @@ -254,7 +253,8 @@ impl TimeSeriesQueryToSQLTransformer<'_> { ) -> Result<(SelectStatement, HashSet), TimeSeriesQueryToSQLError> { let subquery_alias = "subquery"; let subquery_name = Name::Table(subquery_alias.to_string()); - let mut expr_transformer = self.create_transformer(Some(&subquery_name), self.database_type.clone()); + let mut expr_transformer = + self.create_transformer(Some(&subquery_name), self.database_type.clone()); let se = expr_transformer.sparql_expression_to_sql_expression(e)?; let (select, mut columns) = self.create_query_nested( @@ -557,7 +557,8 @@ impl TimeSeriesQueryToSQLTransformer<'_> { let outer_query_str = "outer_query"; let outer_query_name = Name::Table(outer_query_str.to_string()); let mut new_columns = HashSet::new(); - let mut agg_transformer = self.create_transformer(Some(&outer_query_name), self.database_type.clone()); + let mut agg_transformer = + self.create_transformer(Some(&outer_query_name), self.database_type.clone()); let mut aggs = vec![]; for (_, agg) in aggregations { aggs.push(agg_transformer.sparql_aggregate_expression_to_sql_expression(agg)?); @@ -622,7 +623,7 @@ impl TimeSeriesQueryToSQLTransformer<'_> { fn create_transformer<'a>( &'a self, table_name: Option<&'a Name>, - database_type: DatabaseType + database_type: DatabaseType, ) -> SPARQLToSQLExpressionTransformer { if self.partition_support { SPARQLToSQLExpressionTransformer::new( @@ -630,7 +631,8 @@ impl TimeSeriesQueryToSQLTransformer<'_> { Some(YEAR_PARTITION_COLUMN_NAME), Some(MONTH_PARTITION_COLUMN_NAME), Some(DAY_PARTITION_COLUMN_NAME), - database_type) + database_type, + ) } else { SPARQLToSQLExpressionTransformer::new(table_name, None, None, None, database_type) } @@ -739,6 +741,7 @@ mod tests { use crate::timeseries_database::timeseries_sql_rewrite::{ TimeSeriesQueryToSQLTransformer, TimeSeriesTable, }; + use crate::timeseries_database::DatabaseType; use crate::timeseries_query::{ BasicTimeSeriesQuery, GroupedTimeSeriesQuery, Synchronizer, TimeSeriesQuery, }; @@ -750,7 +753,6 @@ mod tests { use sea_query::PostgresQueryBuilder; use spargebra::algebra::{AggregateExpression, Expression, Function}; use std::vec; - use crate::timeseries_database::DatabaseType; #[test] pub fn test_translate() { diff --git a/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite.rs b/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite.rs index d925617..7e6685e 100644 --- a/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite.rs +++ b/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite.rs @@ -4,7 +4,7 @@ use sea_query::extension::bigquery::{BqFunc, DateTimePart}; use sea_query::IntoIden; use sea_query::{BinOper, ColumnRef, SimpleExpr, UnOper, Value}; use sea_query::{Expr as SeaExpr, Func}; -use spargebra::algebra::{Expression}; +use spargebra::algebra::Expression; use crate::constants::DATETIME_AS_SECONDS; use crate::timeseries_database::timeseries_sql_rewrite::{Name, TimeSeriesQueryToSQLError}; @@ -18,7 +18,7 @@ pub(crate) struct SPARQLToSQLExpressionTransformer<'a> { month_col: Option<&'a str>, day_col: Option<&'a str>, pub used_partitioning: bool, - database_type: DatabaseType + database_type: DatabaseType, } impl SPARQLToSQLExpressionTransformer<'_> { @@ -27,7 +27,7 @@ impl SPARQLToSQLExpressionTransformer<'_> { year_col: Option<&'a str>, month_col: Option<&'a str>, day_col: Option<&'a str>, - database_type: DatabaseType + database_type: DatabaseType, ) -> SPARQLToSQLExpressionTransformer<'a> { SPARQLToSQLExpressionTransformer { table_name, @@ -35,7 +35,7 @@ impl SPARQLToSQLExpressionTransformer<'_> { month_col, day_col, used_partitioning: false, - database_type + database_type, } } @@ -197,7 +197,7 @@ impl SPARQLToSQLExpressionTransformer<'_> { } }; BqFunc::extract(datetime_part, mapped_e) - }, + } DatabaseType::Dremio => { let date_part_name = match f { spargebra::algebra::Function::Year => "year", diff --git a/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite/aggregate_expressions.rs b/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite/aggregate_expressions.rs index 43da282..2d51c8a 100644 --- a/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite/aggregate_expressions.rs +++ b/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite/aggregate_expressions.rs @@ -1,8 +1,8 @@ use super::SPARQLToSQLExpressionTransformer; use crate::timeseries_database::timeseries_sql_rewrite::TimeSeriesQueryToSQLError; +use crate::timeseries_database::DatabaseType; use sea_query::{Func, SimpleExpr}; use spargebra::algebra::AggregateExpression; -use crate::timeseries_database::DatabaseType; impl SPARQLToSQLExpressionTransformer<'_> { //TODO: Support distinct in aggregates.. how??? @@ -29,9 +29,9 @@ impl SPARQLToSQLExpressionTransformer<'_> { AggregateExpression::Min { expr, distinct: _ } => { SimpleExpr::FunctionCall(Func::min(self.sparql_expression_to_sql_expression(expr)?)) } - AggregateExpression::Max { expr, distinct: _ } => SimpleExpr::FunctionCall( - Func::max(self.sparql_expression_to_sql_expression(expr)?) - ), + AggregateExpression::Max { expr, distinct: _ } => { + SimpleExpr::FunctionCall(Func::max(self.sparql_expression_to_sql_expression(expr)?)) + } AggregateExpression::GroupConcat { expr: _, distinct: _, diff --git a/chrontext/src/timeseries_database/timeseries_sql_rewrite/partitioning_support.rs b/chrontext/src/timeseries_database/timeseries_sql_rewrite/partitioning_support.rs index 81611c8..7132b0b 100644 --- a/chrontext/src/timeseries_database/timeseries_sql_rewrite/partitioning_support.rs +++ b/chrontext/src/timeseries_database/timeseries_sql_rewrite/partitioning_support.rs @@ -1,7 +1,7 @@ use super::Name; use log::debug; use polars_core::export::chrono::Datelike; -use sea_query::{IntoIden, BinOper, ColumnRef, SimpleExpr, Value, FunctionCall}; +use sea_query::{BinOper, ColumnRef, FunctionCall, IntoIden, SimpleExpr, Value}; pub fn add_partitioned_timestamp_conditions( se: SimpleExpr, @@ -39,8 +39,14 @@ pub fn add_partitioned_timestamp_conditions( }) .collect(); let added = rewrites_and_added.iter().fold(false, |x, (_, y)| x || *y); - let se_rewrites: Vec = rewrites_and_added.into_iter().map(|(x, _)| x).collect(); - (SimpleExpr::FunctionCall(FunctionCall::new(func_call.get_func().clone()).args(se_rewrites)), added) + let se_rewrites: Vec = + rewrites_and_added.into_iter().map(|(x, _)| x).collect(); + ( + SimpleExpr::FunctionCall( + FunctionCall::new(func_call.get_func().clone()).args(se_rewrites), + ), + added, + ) } SimpleExpr::Binary(left, op, right) => rewrite_binary_expression( *left, @@ -539,9 +545,9 @@ fn smaller_than_or_original( } fn named_column_box_simple_expression(name: String) -> Box { - Box::new( - SimpleExpr::Column(ColumnRef::Column(Name::Column(name).into_iden()) - )) + Box::new(SimpleExpr::Column(ColumnRef::Column( + Name::Column(name).into_iden(), + ))) } fn year_equal_and_month_equal_and_day_equal( diff --git a/chrontext/tests/opcua_data_provider.rs b/chrontext/tests/opcua_data_provider.rs index 8261854..59dc4d5 100644 --- a/chrontext/tests/opcua_data_provider.rs +++ b/chrontext/tests/opcua_data_provider.rs @@ -1,4 +1,5 @@ use opcua::server::prelude::*; +use opcua::sync::RwLock; use polars::export::chrono::{DateTime as PolarsDateTime, Utc as PolarsUtc}; use polars::export::chrono::{NaiveDateTime, Utc}; use polars::prelude::{col, lit, DataType as PolarsDataType, IntoLazy}; @@ -7,8 +8,7 @@ use polars_core::frame::DataFrame; use polars_core::prelude::TimeUnit; use std::collections::HashMap; use std::ops::{Div, Mul}; -use std::sync::{Arc}; -use opcua::sync::RwLock; +use std::sync::Arc; const OPCUA_AGG_FUNC_AVERAGE: u32 = 2342; #[allow(dead_code)] diff --git a/py_chrontext/Cargo.toml b/py_chrontext/Cargo.toml index cd865b1..f0f7878 100644 --- a/py_chrontext/Cargo.toml +++ b/py_chrontext/Cargo.toml @@ -50,4 +50,7 @@ python-source = "python" mimalloc = { version = "0.1.37", default-features = false } [target.'cfg(target_os = "linux")'.dependencies] -jemallocator = { version = "0.5.4", features = ["disable_initial_exec_tls"] } \ No newline at end of file +jemallocator = { version = "0.5.4", features = ["disable_initial_exec_tls"] } + +[patch.crates-io] +opcua = { git = 'https://github.com/locka99/opcua.git', commit="a5f452a67a0efbb1faaacda023612eea6d63be22"} \ No newline at end of file