From cf5e2b46b0f2db86ff6b1c533f2b56de85c0376b Mon Sep 17 00:00:00 2001 From: Magnus Bakken <10287813+magbak@users.noreply.github.com> Date: Wed, 1 Nov 2023 21:29:57 +0100 Subject: [PATCH] Change capitalization and fix bug in logic for avoiding ntriples reload --- README.md | 4 +- chrontext/src/combiner.rs | 24 +-- chrontext/src/combiner/lazy_expressions.rs | 4 +- chrontext/src/combiner/lazy_graph_patterns.rs | 4 +- .../combiner/lazy_graph_patterns/distinct.rs | 4 +- .../combiner/lazy_graph_patterns/extend.rs | 4 +- .../combiner/lazy_graph_patterns/filter.rs | 4 +- .../src/combiner/lazy_graph_patterns/group.rs | 4 +- .../src/combiner/lazy_graph_patterns/join.rs | 4 +- .../combiner/lazy_graph_patterns/left_join.rs | 4 +- .../src/combiner/lazy_graph_patterns/minus.rs | 4 +- .../combiner/lazy_graph_patterns/order_by.rs | 4 +- .../combiner/lazy_graph_patterns/project.rs | 4 +- .../src/combiner/lazy_graph_patterns/union.rs | 4 +- chrontext/src/combiner/time_series_queries.rs | 16 +- chrontext/src/engine.rs | 6 +- chrontext/src/preparing.rs | 16 +- chrontext/src/preparing/expressions.rs | 10 +- .../preparing/expressions/and_expression.rs | 4 +- .../expressions/binary_ordinary_expression.rs | 4 +- .../expressions/coalesce_expression.rs | 4 +- .../expressions/exists_expression.rs | 4 +- .../expressions/function_call_expression.rs | 4 +- .../preparing/expressions/if_expression.rs | 4 +- .../preparing/expressions/in_expression.rs | 4 +- .../preparing/expressions/not_expression.rs | 4 +- .../preparing/expressions/or_expression.rs | 4 +- .../expressions/unary_ordinary_expression.rs | 4 +- chrontext/src/preparing/graph_patterns.rs | 10 +- .../preparing/graph_patterns/bgp_pattern.rs | 8 +- .../graph_patterns/distinct_pattern.rs | 4 +- .../graph_patterns/extend_pattern.rs | 8 +- .../filter_expression_rewrites.rs | 6 +- .../graph_patterns/filter_pattern.rs | 8 +- .../preparing/graph_patterns/graph_pattern.rs | 4 +- .../preparing/graph_patterns/group_pattern.rs | 30 ++-- .../preparing/graph_patterns/join_pattern.rs | 4 +- .../graph_patterns/left_join_pattern.rs | 4 +- .../preparing/graph_patterns/minus_pattern.rs | 4 +- .../graph_patterns/order_by_pattern.rs | 4 +- .../preparing/graph_patterns/path_pattern.rs | 4 +- .../graph_patterns/project_pattern.rs | 4 +- .../graph_patterns/reduced_pattern.rs | 4 +- .../graph_patterns/service_pattern.rs | 4 +- .../graph_patterns/sliced_pattern.rs | 4 +- .../preparing/graph_patterns/union_pattern.rs | 4 +- .../graph_patterns/values_pattern.rs | 4 +- chrontext/src/preparing/synchronization.rs | 8 +- chrontext/src/rewriting.rs | 6 +- .../rewriting/graph_patterns/bgp_pattern.rs | 8 +- chrontext/src/timeseries_database.rs | 18 +- .../arrow_flight_sql_database.rs | 20 +-- .../timeseries_database/bigquery_database.rs | 20 +-- .../timeseries_database/opcua_history_read.rs | 52 +++--- .../simple_in_memory_timeseries.rs | 30 ++-- .../timeseries_sql_rewrite.rs | 132 +++++++------- .../expression_rewrite.rs | 8 +- .../aggregate_expressions.rs | 4 +- chrontext/src/timeseries_query.rs | 162 +++++++++--------- chrontext/tests/query_execution_arrow_sql.rs | 10 +- chrontext/tests/rewrites.rs | 8 +- py_chrontext/Cargo.toml | 2 +- py_chrontext/src/errors.rs | 16 +- py_chrontext/src/lib.rs | 42 ++--- py_chrontext/tests/test_arrow_flight_sql.py | 8 +- py_chrontext/tests/test_opcua.py | 2 +- 66 files changed, 421 insertions(+), 421 deletions(-) diff --git a/README.md b/README.md index ea65cde..b2a4a67 100644 --- a/README.md +++ b/README.md @@ -25,10 +25,10 @@ Query execution is then interleaved and results are combined to produce the answ We can make queries in Python. The code assumes that we have a SPARQL-endpoint and an Arrow Flight SQL-endpoint (Dremio) set up. ```python import pathlib -from chrontext import Engine, TimeseriesDremioDatabase, TimeSeriesTable +from chrontext import Engine, TimeseriesDremioDatabase, TimeseriesTable tables = [ - TimeSeriesTable( + TimeseriesTable( resource_name="my_resource", schema="my_nas", time_series_table="ts.parquet", diff --git a/chrontext/src/combiner.rs b/chrontext/src/combiner.rs index ab93a36..8bc1720 100644 --- a/chrontext/src/combiner.rs +++ b/chrontext/src/combiner.rs @@ -9,11 +9,11 @@ pub(crate) mod time_series_queries; use crate::query_context::Context; use crate::combiner::solution_mapping::SolutionMappings; -use crate::preparing::TimeSeriesQueryPrepper; +use crate::preparing::TimeseriesQueryPrepper; use crate::pushdown_setting::PushdownSetting; use crate::sparql_database::SparqlQueryable; -use crate::timeseries_database::TimeSeriesQueryable; -use crate::timeseries_query::{BasicTimeSeriesQuery, TimeSeriesValidationError}; +use crate::timeseries_database::TimeseriesQueryable; +use crate::timeseries_query::{BasicTimeseriesQuery, TimeseriesValidationError}; use spargebra::algebra::Expression; use spargebra::Query; use std::collections::{HashMap, HashSet}; @@ -22,10 +22,10 @@ use std::fmt::{Display, Formatter}; #[derive(Debug)] pub enum CombinerError { - TimeSeriesQueryError(Box), + TimeseriesQueryError(Box), StaticQueryExecutionError(Box), InconsistentDatatype(String, String, String), - TimeSeriesValidationError(TimeSeriesValidationError), + TimeseriesValidationError(TimeseriesValidationError), ResourceIsNotString(String, String), InconsistentResourceName(String, String, String), } @@ -40,13 +40,13 @@ impl Display for CombinerError { s1, s2, s3 ) } - CombinerError::TimeSeriesQueryError(tsqe) => { + CombinerError::TimeseriesQueryError(tsqe) => { write!(f, "Time series query error {}", tsqe) } CombinerError::StaticQueryExecutionError(sqee) => { write!(f, "Static query execution error {}", sqee) } - CombinerError::TimeSeriesValidationError(v) => { + CombinerError::TimeseriesValidationError(v) => { write!(f, "Time series validation error {}", v) } CombinerError::ResourceIsNotString(value_var, actual_datatype) => { @@ -70,19 +70,19 @@ impl Error for CombinerError {} pub struct Combiner { counter: u16, pub sparql_database: Box, - pub time_series_database: Box, - prepper: TimeSeriesQueryPrepper, + pub time_series_database: Box, + prepper: TimeseriesQueryPrepper, } impl Combiner { pub fn new( sparql_database: Box, pushdown_settings: HashSet, - time_series_database: Box, - basic_time_series_queries: Vec, + time_series_database: Box, + basic_time_series_queries: Vec, rewritten_filters: HashMap, ) -> Combiner { - let prepper = TimeSeriesQueryPrepper::new( + let prepper = TimeseriesQueryPrepper::new( pushdown_settings, basic_time_series_queries, rewritten_filters, diff --git a/chrontext/src/combiner/lazy_expressions.rs b/chrontext/src/combiner/lazy_expressions.rs index 5a7c21f..92f57c1 100644 --- a/chrontext/src/combiner/lazy_expressions.rs +++ b/chrontext/src/combiner/lazy_expressions.rs @@ -13,7 +13,7 @@ use crate::query_context::{Context, PathEntry}; use crate::sparql_result_to_polars::{ sparql_literal_to_polars_literal_value, sparql_named_node_to_polars_literal_value, }; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use async_recursion::async_recursion; use oxrdf::vocab::xsd; use polars::datatypes::DataType; @@ -35,7 +35,7 @@ impl Combiner { expr: &Expression, mut solution_mappings: SolutionMappings, mut static_query_map: Option>, - mut prepared_time_series_queries: Option>>, + mut prepared_time_series_queries: Option>>, context: &Context, ) -> Result { let output_solution_mappings = match expr { diff --git a/chrontext/src/combiner/lazy_graph_patterns.rs b/chrontext/src/combiner/lazy_graph_patterns.rs index 870d801..d8dc3c6 100644 --- a/chrontext/src/combiner/lazy_graph_patterns.rs +++ b/chrontext/src/combiner/lazy_graph_patterns.rs @@ -14,7 +14,7 @@ use crate::combiner::solution_mapping::SolutionMappings; use crate::combiner::CombinerError; use crate::preparing::graph_patterns::GPPrepReturn; use crate::query_context::{Context, PathEntry}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use async_recursion::async_recursion; use log::debug; use spargebra::algebra::GraphPattern; @@ -28,7 +28,7 @@ impl Combiner { graph_pattern: &GraphPattern, solution_mappings: Option, mut static_query_map: HashMap, - prepared_time_series_queries: Option>>, + prepared_time_series_queries: Option>>, context: &Context, ) -> Result { debug!("Processing graph pattern at context: {}", context.as_str()); diff --git a/chrontext/src/combiner/lazy_graph_patterns/distinct.rs b/chrontext/src/combiner/lazy_graph_patterns/distinct.rs index 30fbf5d..f202af1 100644 --- a/chrontext/src/combiner/lazy_graph_patterns/distinct.rs +++ b/chrontext/src/combiner/lazy_graph_patterns/distinct.rs @@ -2,7 +2,7 @@ use super::Combiner; use crate::combiner::solution_mapping::SolutionMappings; use crate::combiner::CombinerError; use crate::query_context::{Context, PathEntry}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use async_recursion::async_recursion; use log::debug; use polars_core::frame::UniqueKeepStrategy; @@ -17,7 +17,7 @@ impl Combiner { inner: &GraphPattern, solution_mappings: Option, static_query_map: HashMap, - prepared_time_series_queries: Option>>, + prepared_time_series_queries: Option>>, context: &Context, ) -> Result { debug!("Processing distinct graph pattern"); diff --git a/chrontext/src/combiner/lazy_graph_patterns/extend.rs b/chrontext/src/combiner/lazy_graph_patterns/extend.rs index 37a323f..07651df 100644 --- a/chrontext/src/combiner/lazy_graph_patterns/extend.rs +++ b/chrontext/src/combiner/lazy_graph_patterns/extend.rs @@ -4,7 +4,7 @@ use crate::combiner::static_subqueries::split_static_queries; use crate::combiner::time_series_queries::split_time_series_queries; use crate::combiner::CombinerError; use crate::query_context::{Context, PathEntry}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use async_recursion::async_recursion; use log::debug; use oxrdf::Variable; @@ -21,7 +21,7 @@ impl Combiner { expression: &Expression, input_solution_mappings: Option, mut static_query_map: HashMap, - mut prepared_time_series_queries: Option>>, + mut prepared_time_series_queries: Option>>, context: &Context, ) -> Result { debug!("Processing extend graph pattern"); diff --git a/chrontext/src/combiner/lazy_graph_patterns/filter.rs b/chrontext/src/combiner/lazy_graph_patterns/filter.rs index 5487992..18c52ca 100644 --- a/chrontext/src/combiner/lazy_graph_patterns/filter.rs +++ b/chrontext/src/combiner/lazy_graph_patterns/filter.rs @@ -4,7 +4,7 @@ use crate::combiner::static_subqueries::split_static_queries; use crate::combiner::time_series_queries::split_time_series_queries; use crate::combiner::CombinerError; use crate::query_context::{Context, PathEntry}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use async_recursion::async_recursion; use log::debug; use polars::prelude::col; @@ -20,7 +20,7 @@ impl Combiner { expression: &Expression, input_solution_mappings: Option, mut static_query_map: HashMap, - mut prepared_time_series_queries: Option>>, + mut prepared_time_series_queries: Option>>, context: &Context, ) -> Result { debug!("Processing filter graph pattern"); diff --git a/chrontext/src/combiner/lazy_graph_patterns/group.rs b/chrontext/src/combiner/lazy_graph_patterns/group.rs index 0e92bcb..6398fb8 100644 --- a/chrontext/src/combiner/lazy_graph_patterns/group.rs +++ b/chrontext/src/combiner/lazy_graph_patterns/group.rs @@ -4,7 +4,7 @@ use crate::combiner::static_subqueries::split_static_queries; use crate::combiner::time_series_queries::split_time_series_queries; use crate::combiner::CombinerError; use crate::query_context::{Context, PathEntry}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use log::debug; use oxrdf::Variable; use polars::prelude::{col, Expr}; @@ -20,7 +20,7 @@ impl Combiner { aggregates: &Vec<(Variable, AggregateExpression)>, solution_mapping: Option, mut static_query_map: HashMap, - mut prepared_time_series_queries: Option>>, + mut prepared_time_series_queries: Option>>, context: &Context, ) -> Result { debug!("Processing group graph pattern"); diff --git a/chrontext/src/combiner/lazy_graph_patterns/join.rs b/chrontext/src/combiner/lazy_graph_patterns/join.rs index 3f393fd..86968c3 100644 --- a/chrontext/src/combiner/lazy_graph_patterns/join.rs +++ b/chrontext/src/combiner/lazy_graph_patterns/join.rs @@ -4,7 +4,7 @@ use crate::combiner::static_subqueries::split_static_queries; use crate::combiner::time_series_queries::split_time_series_queries; use crate::combiner::CombinerError; use crate::query_context::{Context, PathEntry}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use async_recursion::async_recursion; use log::debug; use spargebra::algebra::GraphPattern; @@ -19,7 +19,7 @@ impl Combiner { right: &GraphPattern, solution_mappings: Option, mut static_query_map: HashMap, - mut prepared_time_series_queries: Option>>, + mut prepared_time_series_queries: Option>>, context: &Context, ) -> Result { debug!("Processing join graph pattern"); diff --git a/chrontext/src/combiner/lazy_graph_patterns/left_join.rs b/chrontext/src/combiner/lazy_graph_patterns/left_join.rs index a74a21d..dff10ed 100644 --- a/chrontext/src/combiner/lazy_graph_patterns/left_join.rs +++ b/chrontext/src/combiner/lazy_graph_patterns/left_join.rs @@ -6,7 +6,7 @@ use crate::combiner::static_subqueries::split_static_queries; use crate::combiner::time_series_queries::split_time_series_queries; use crate::combiner::CombinerError; use crate::query_context::{Context, PathEntry}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use async_recursion::async_recursion; use log::debug; use polars::prelude::{col, Expr, IntoLazy}; @@ -25,7 +25,7 @@ impl Combiner { expression: &Option, solution_mapping: Option, mut static_query_map: HashMap, - mut prepared_time_series_queries: Option>>, + mut prepared_time_series_queries: Option>>, context: &Context, ) -> Result { debug!("Processing left join graph pattern"); diff --git a/chrontext/src/combiner/lazy_graph_patterns/minus.rs b/chrontext/src/combiner/lazy_graph_patterns/minus.rs index abb5abf..d2d2948 100644 --- a/chrontext/src/combiner/lazy_graph_patterns/minus.rs +++ b/chrontext/src/combiner/lazy_graph_patterns/minus.rs @@ -4,7 +4,7 @@ use crate::combiner::static_subqueries::split_static_queries; use crate::combiner::time_series_queries::split_time_series_queries; use crate::combiner::CombinerError; use crate::query_context::{Context, PathEntry}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use async_recursion::async_recursion; use log::debug; use polars::prelude::{col, Expr, IntoLazy, LiteralValue}; @@ -21,7 +21,7 @@ impl Combiner { right: &GraphPattern, solution_mappings: Option, mut static_query_map: HashMap, - mut prepared_time_series_queries: Option>>, + mut prepared_time_series_queries: Option>>, context: &Context, ) -> Result { debug!("Processing minus graph pattern"); diff --git a/chrontext/src/combiner/lazy_graph_patterns/order_by.rs b/chrontext/src/combiner/lazy_graph_patterns/order_by.rs index 2284cba..526fb7a 100644 --- a/chrontext/src/combiner/lazy_graph_patterns/order_by.rs +++ b/chrontext/src/combiner/lazy_graph_patterns/order_by.rs @@ -2,7 +2,7 @@ use super::Combiner; use crate::combiner::solution_mapping::SolutionMappings; use crate::combiner::CombinerError; use crate::query_context::{Context, PathEntry}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use async_recursion::async_recursion; use log::debug; use polars::prelude::{col, Expr}; @@ -18,7 +18,7 @@ impl Combiner { expression: &Vec, solution_mappings: Option, static_query_map: HashMap, - prepared_time_series_queries: Option>>, + prepared_time_series_queries: Option>>, context: &Context, ) -> Result { debug!("Processing order by graph pattern"); diff --git a/chrontext/src/combiner/lazy_graph_patterns/project.rs b/chrontext/src/combiner/lazy_graph_patterns/project.rs index add892d..a96da79 100644 --- a/chrontext/src/combiner/lazy_graph_patterns/project.rs +++ b/chrontext/src/combiner/lazy_graph_patterns/project.rs @@ -2,7 +2,7 @@ use super::Combiner; use crate::combiner::lazy_graph_patterns::SolutionMappings; use crate::combiner::CombinerError; use crate::query_context::{Context, PathEntry}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use async_recursion::async_recursion; use log::{debug, warn}; use oxrdf::Variable; @@ -19,7 +19,7 @@ impl Combiner { variables: &Vec, solution_mappings: Option, static_query_map: HashMap, - prepared_time_series_queries: Option>>, + prepared_time_series_queries: Option>>, context: &Context, ) -> Result { debug!("Processing project graph pattern"); diff --git a/chrontext/src/combiner/lazy_graph_patterns/union.rs b/chrontext/src/combiner/lazy_graph_patterns/union.rs index 3ac7893..7b19443 100644 --- a/chrontext/src/combiner/lazy_graph_patterns/union.rs +++ b/chrontext/src/combiner/lazy_graph_patterns/union.rs @@ -4,7 +4,7 @@ use crate::combiner::static_subqueries::split_static_queries; use crate::combiner::time_series_queries::split_time_series_queries; use crate::combiner::CombinerError; use crate::query_context::{Context, PathEntry}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use async_recursion::async_recursion; use log::debug; use polars::prelude::{concat, UnionArgs}; @@ -20,7 +20,7 @@ impl Combiner { right: &GraphPattern, solution_mappings: Option, mut static_query_map: HashMap, - mut prepared_time_series_queries: Option>>, + mut prepared_time_series_queries: Option>>, context: &Context, ) -> Result { debug!("Processing union graph pattern"); diff --git a/chrontext/src/combiner/time_series_queries.rs b/chrontext/src/combiner/time_series_queries.rs index 829d5ef..c5a1a3c 100644 --- a/chrontext/src/combiner/time_series_queries.rs +++ b/chrontext/src/combiner/time_series_queries.rs @@ -2,7 +2,7 @@ use super::Combiner; use crate::combiner::solution_mapping::SolutionMappings; use crate::combiner::CombinerError; use crate::query_context::Context; -use crate::timeseries_query::{BasicTimeSeriesQuery, TimeSeriesQuery}; +use crate::timeseries_query::{BasicTimeseriesQuery, TimeseriesQuery}; use log::debug; use oxrdf::vocab::xsd; use oxrdf::Term; @@ -15,7 +15,7 @@ use std::collections::{HashMap, HashSet}; impl Combiner { pub async fn execute_attach_time_series_query( &mut self, - tsq: &TimeSeriesQuery, + tsq: &TimeseriesQuery, mut solution_mappings: SolutionMappings, ) -> Result { debug!("Executing time series query: {:?}", tsq); @@ -23,10 +23,10 @@ impl Combiner { .time_series_database .execute(tsq) .await - .map_err(|x| CombinerError::TimeSeriesQueryError(x))?; + .map_err(|x| CombinerError::TimeseriesQueryError(x))?; debug!("Time series query results: \n{}", ts_df); tsq.validate(&ts_df) - .map_err(|x| CombinerError::TimeSeriesValidationError(x))?; + .map_err(|x| CombinerError::TimeseriesValidationError(x))?; let mut on: Vec; let mut drop_cols: Vec; @@ -112,9 +112,9 @@ impl Combiner { } pub(crate) fn split_time_series_queries( - time_series_queries: &mut Option>>, + time_series_queries: &mut Option>>, context: &Context, -) -> Option>> { +) -> Option>> { if let Some(tsqs) = time_series_queries { let mut split_keys = vec![]; for k in tsqs.keys() { @@ -135,7 +135,7 @@ pub(crate) fn split_time_series_queries( pub(crate) fn complete_basic_time_series_queries( static_query_solutions: &Vec, - basic_time_series_queries: &mut Vec, + basic_time_series_queries: &mut Vec, ) -> Result<(), CombinerError> { for basic_query in basic_time_series_queries { let mut ids = HashSet::new(); @@ -174,7 +174,7 @@ pub(crate) fn complete_basic_time_series_queries( } } - let get_basic_query_value_var_name = |x: &BasicTimeSeriesQuery| { + let get_basic_query_value_var_name = |x: &BasicTimeseriesQuery| { if let Some(vv) = &x.value_variable { vv.variable.as_str().to_string() } else { diff --git a/chrontext/src/engine.rs b/chrontext/src/engine.rs index 61e492c..133e46e 100644 --- a/chrontext/src/engine.rs +++ b/chrontext/src/engine.rs @@ -4,7 +4,7 @@ use crate::pushdown_setting::PushdownSetting; use crate::rewriting::StaticQueryRewriter; use crate::sparql_database::SparqlQueryable; use crate::splitter::parse_sparql_select_query; -use crate::timeseries_database::TimeSeriesQueryable; +use crate::timeseries_database::TimeseriesQueryable; use log::debug; use polars::frame::DataFrame; use std::collections::HashSet; @@ -12,14 +12,14 @@ use std::error::Error; pub struct Engine { pushdown_settings: HashSet, - time_series_database: Option>, + time_series_database: Option>, pub sparql_database: Option>, } impl Engine { pub fn new( pushdown_settings: HashSet, - time_series_database: Box, + time_series_database: Box, sparql_database: Box, ) -> Engine { Engine { diff --git a/chrontext/src/preparing.rs b/chrontext/src/preparing.rs index 306b780..ac71b36 100644 --- a/chrontext/src/preparing.rs +++ b/chrontext/src/preparing.rs @@ -5,26 +5,26 @@ mod synchronization; use crate::combiner::solution_mapping::SolutionMappings; use crate::pushdown_setting::PushdownSetting; use crate::query_context::Context; -use crate::timeseries_query::{BasicTimeSeriesQuery, TimeSeriesQuery}; +use crate::timeseries_query::{BasicTimeseriesQuery, TimeseriesQuery}; use spargebra::algebra::Expression; use spargebra::Query; use std::collections::{HashMap, HashSet}; #[derive(Debug)] -pub struct TimeSeriesQueryPrepper { +pub struct TimeseriesQueryPrepper { pushdown_settings: HashSet, - pub(crate) basic_time_series_queries: Vec, + pub(crate) basic_time_series_queries: Vec, grouping_counter: u16, rewritten_filters: HashMap, } -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn new( pushdown_settings: HashSet, - basic_time_series_queries: Vec, + basic_time_series_queries: Vec, rewritten_filters: HashMap, - ) -> TimeSeriesQueryPrepper { - TimeSeriesQueryPrepper { + ) -> TimeseriesQueryPrepper { + TimeseriesQueryPrepper { pushdown_settings, basic_time_series_queries, grouping_counter: 0, @@ -36,7 +36,7 @@ impl TimeSeriesQueryPrepper { &mut self, query: &Query, solution_mappings: &mut SolutionMappings, - ) -> HashMap> { + ) -> HashMap> { if let Query::Select { pattern, .. } = query { let pattern_prepared = self.prepare_graph_pattern(pattern, false, solution_mappings, &Context::new()); diff --git a/chrontext/src/preparing/expressions.rs b/chrontext/src/preparing/expressions.rs index ea12d56..2e70e25 100644 --- a/chrontext/src/preparing/expressions.rs +++ b/chrontext/src/preparing/expressions.rs @@ -9,23 +9,23 @@ mod not_expression; mod or_expression; mod unary_ordinary_expression; -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::expressions::binary_ordinary_expression::BinaryOrdinaryOperator; use crate::preparing::expressions::unary_ordinary_expression::UnaryOrdinaryOperator; use crate::query_context::Context; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use spargebra::algebra::Expression; use std::collections::HashMap; #[derive(Debug)] pub struct EXPrepReturn { pub fail_groupby_complex_query: bool, - pub time_series_queries: HashMap>, + pub time_series_queries: HashMap>, } impl EXPrepReturn { - fn new(time_series_queries: HashMap>) -> EXPrepReturn { + fn new(time_series_queries: HashMap>) -> EXPrepReturn { EXPrepReturn { time_series_queries, fail_groupby_complex_query: false, @@ -50,7 +50,7 @@ impl EXPrepReturn { } } -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_expression( &mut self, expression: &Expression, diff --git a/chrontext/src/preparing/expressions/and_expression.rs b/chrontext/src/preparing/expressions/and_expression.rs index a4fee0b..7c790c2 100644 --- a/chrontext/src/preparing/expressions/and_expression.rs +++ b/chrontext/src/preparing/expressions/and_expression.rs @@ -1,10 +1,10 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::expressions::EXPrepReturn; use crate::query_context::{Context, PathEntry}; use spargebra::algebra::Expression; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_and_expression( &mut self, left: &Expression, diff --git a/chrontext/src/preparing/expressions/binary_ordinary_expression.rs b/chrontext/src/preparing/expressions/binary_ordinary_expression.rs index 9e81635..c53d76f 100644 --- a/chrontext/src/preparing/expressions/binary_ordinary_expression.rs +++ b/chrontext/src/preparing/expressions/binary_ordinary_expression.rs @@ -1,4 +1,4 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::expressions::EXPrepReturn; use crate::query_context::{Context, PathEntry}; @@ -17,7 +17,7 @@ pub enum BinaryOrdinaryOperator { Equal, } -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_binary_ordinary_expression( &mut self, left: &Expression, diff --git a/chrontext/src/preparing/expressions/coalesce_expression.rs b/chrontext/src/preparing/expressions/coalesce_expression.rs index dc6c8e3..6d4f647 100644 --- a/chrontext/src/preparing/expressions/coalesce_expression.rs +++ b/chrontext/src/preparing/expressions/coalesce_expression.rs @@ -1,11 +1,11 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::expressions::EXPrepReturn; use crate::query_context::{Context, PathEntry}; use spargebra::algebra::Expression; use std::collections::HashMap; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_coalesce_expression( &mut self, wrapped: &Vec, diff --git a/chrontext/src/preparing/expressions/exists_expression.rs b/chrontext/src/preparing/expressions/exists_expression.rs index 28681d7..27f8d64 100644 --- a/chrontext/src/preparing/expressions/exists_expression.rs +++ b/chrontext/src/preparing/expressions/exists_expression.rs @@ -1,10 +1,10 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::expressions::EXPrepReturn; use crate::query_context::{Context, PathEntry}; use spargebra::algebra::GraphPattern; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_exists_expression( &mut self, wrapped: &GraphPattern, diff --git a/chrontext/src/preparing/expressions/function_call_expression.rs b/chrontext/src/preparing/expressions/function_call_expression.rs index 77f864a..1a1347d 100644 --- a/chrontext/src/preparing/expressions/function_call_expression.rs +++ b/chrontext/src/preparing/expressions/function_call_expression.rs @@ -1,11 +1,11 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::expressions::EXPrepReturn; use crate::query_context::{Context, PathEntry}; use spargebra::algebra::{Expression, Function}; use std::collections::HashMap; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_function_call_expression( &mut self, _fun: &Function, diff --git a/chrontext/src/preparing/expressions/if_expression.rs b/chrontext/src/preparing/expressions/if_expression.rs index 7d839d6..1ada9d5 100644 --- a/chrontext/src/preparing/expressions/if_expression.rs +++ b/chrontext/src/preparing/expressions/if_expression.rs @@ -1,10 +1,10 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::expressions::EXPrepReturn; use crate::query_context::{Context, PathEntry}; use spargebra::algebra::Expression; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_if_expression( &mut self, left: &Expression, diff --git a/chrontext/src/preparing/expressions/in_expression.rs b/chrontext/src/preparing/expressions/in_expression.rs index c45c7b9..f4db573 100644 --- a/chrontext/src/preparing/expressions/in_expression.rs +++ b/chrontext/src/preparing/expressions/in_expression.rs @@ -1,10 +1,10 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::expressions::EXPrepReturn; use crate::query_context::{Context, PathEntry}; use spargebra::algebra::Expression; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_in_expression( &mut self, left: &Expression, diff --git a/chrontext/src/preparing/expressions/not_expression.rs b/chrontext/src/preparing/expressions/not_expression.rs index c87ac66..d6f381a 100644 --- a/chrontext/src/preparing/expressions/not_expression.rs +++ b/chrontext/src/preparing/expressions/not_expression.rs @@ -1,10 +1,10 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::expressions::EXPrepReturn; use crate::query_context::{Context, PathEntry}; use spargebra::algebra::Expression; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_not_expression( &mut self, wrapped: &Expression, diff --git a/chrontext/src/preparing/expressions/or_expression.rs b/chrontext/src/preparing/expressions/or_expression.rs index 0f772f0..a5c3a07 100644 --- a/chrontext/src/preparing/expressions/or_expression.rs +++ b/chrontext/src/preparing/expressions/or_expression.rs @@ -1,10 +1,10 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::expressions::EXPrepReturn; use crate::query_context::{Context, PathEntry}; use spargebra::algebra::Expression; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_or_expression( &mut self, left: &Expression, diff --git a/chrontext/src/preparing/expressions/unary_ordinary_expression.rs b/chrontext/src/preparing/expressions/unary_ordinary_expression.rs index 03d14d5..10bb32c 100644 --- a/chrontext/src/preparing/expressions/unary_ordinary_expression.rs +++ b/chrontext/src/preparing/expressions/unary_ordinary_expression.rs @@ -1,4 +1,4 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::expressions::EXPrepReturn; use crate::query_context::{Context, PathEntry}; @@ -9,7 +9,7 @@ pub enum UnaryOrdinaryOperator { UnaryMinus, } -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_unary_ordinary_expression( &mut self, wrapped: &Expression, diff --git a/chrontext/src/preparing/graph_patterns.rs b/chrontext/src/preparing/graph_patterns.rs index 73b529f..760b151 100644 --- a/chrontext/src/preparing/graph_patterns.rs +++ b/chrontext/src/preparing/graph_patterns.rs @@ -17,10 +17,10 @@ mod sliced_pattern; mod union_pattern; mod values_pattern; -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::query_context::Context; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use log::debug; use spargebra::algebra::GraphPattern; use std::collections::HashMap; @@ -28,11 +28,11 @@ use std::collections::HashMap; #[derive(Debug)] pub struct GPPrepReturn { pub fail_groupby_complex_query: bool, - pub time_series_queries: HashMap>, + pub time_series_queries: HashMap>, } impl GPPrepReturn { - fn new(time_series_queries: HashMap>) -> GPPrepReturn { + fn new(time_series_queries: HashMap>) -> GPPrepReturn { GPPrepReturn { fail_groupby_complex_query: false, time_series_queries, @@ -57,7 +57,7 @@ impl GPPrepReturn { } } -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_graph_pattern( &mut self, graph_pattern: &GraphPattern, diff --git a/chrontext/src/preparing/graph_patterns/bgp_pattern.rs b/chrontext/src/preparing/graph_patterns/bgp_pattern.rs index 477d186..b5b94f5 100644 --- a/chrontext/src/preparing/graph_patterns/bgp_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/bgp_pattern.rs @@ -1,11 +1,11 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::preparing::graph_patterns::GPPrepReturn; use crate::preparing::synchronization::create_identity_synchronized_queries; use crate::query_context::{Context, PathEntry}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use std::collections::HashMap; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub(crate) fn prepare_bgp( &mut self, try_groupby_complex_query: bool, @@ -16,7 +16,7 @@ impl TimeSeriesQueryPrepper { for tsq in &self.basic_time_series_queries { if let Some(dp_ctx) = &tsq.data_point_variable { if &dp_ctx.context == &bgp_context { - local_tsqs.push(TimeSeriesQuery::Basic(tsq.clone())); + local_tsqs.push(TimeseriesQuery::Basic(tsq.clone())); } } } diff --git a/chrontext/src/preparing/graph_patterns/distinct_pattern.rs b/chrontext/src/preparing/graph_patterns/distinct_pattern.rs index 69aa1a1..909388f 100644 --- a/chrontext/src/preparing/graph_patterns/distinct_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/distinct_pattern.rs @@ -1,4 +1,4 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use log::debug; use crate::combiner::solution_mapping::SolutionMappings; @@ -6,7 +6,7 @@ use crate::preparing::graph_patterns::GPPrepReturn; use crate::query_context::{Context, PathEntry}; use spargebra::algebra::GraphPattern; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_distinct( &mut self, inner: &GraphPattern, diff --git a/chrontext/src/preparing/graph_patterns/extend_pattern.rs b/chrontext/src/preparing/graph_patterns/extend_pattern.rs index b936b6d..f17549a 100644 --- a/chrontext/src/preparing/graph_patterns/extend_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/extend_pattern.rs @@ -1,14 +1,14 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::find_query_variables::find_all_used_variables_in_expression; use crate::preparing::graph_patterns::GPPrepReturn; use crate::query_context::{Context, PathEntry}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use oxrdf::Variable; use spargebra::algebra::{Expression, GraphPattern}; use std::collections::HashSet; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub(crate) fn prepare_extend( &mut self, inner: &GraphPattern, @@ -66,7 +66,7 @@ impl TimeSeriesQueryPrepper { inner_prepare.time_series_queries.remove(&c); } let new_tsq = - TimeSeriesQuery::ExpressionAs(Box::new(inner_tsq), var.clone(), expr.clone()); + TimeseriesQuery::ExpressionAs(Box::new(inner_tsq), var.clone(), expr.clone()); if !inner_prepare.time_series_queries.contains_key(context) { inner_prepare .time_series_queries diff --git a/chrontext/src/preparing/graph_patterns/filter_expression_rewrites.rs b/chrontext/src/preparing/graph_patterns/filter_expression_rewrites.rs index 376cb3e..d2053f8 100644 --- a/chrontext/src/preparing/graph_patterns/filter_expression_rewrites.rs +++ b/chrontext/src/preparing/graph_patterns/filter_expression_rewrites.rs @@ -1,7 +1,7 @@ use crate::change_types::ChangeType; use crate::pushdown_setting::PushdownSetting; use crate::query_context::{Context, PathEntry}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use oxrdf::Literal; use spargebra::algebra::Expression; use std::collections::HashSet; @@ -34,7 +34,7 @@ impl RecursiveRewriteReturn { } pub(crate) fn rewrite_filter_expression( - tsq: &TimeSeriesQuery, + tsq: &TimeseriesQuery, expression: &Expression, required_change_direction: &ChangeType, context: &Context, @@ -53,7 +53,7 @@ pub(crate) fn rewrite_filter_expression( } pub(crate) fn try_recursive_rewrite_expression( - tsq: &TimeSeriesQuery, + tsq: &TimeseriesQuery, static_rewrite_conjunction: &Option>, expression: &Expression, required_change_direction: &ChangeType, diff --git a/chrontext/src/preparing/graph_patterns/filter_pattern.rs b/chrontext/src/preparing/graph_patterns/filter_pattern.rs index 7687aa7..fe2960c 100644 --- a/chrontext/src/preparing/graph_patterns/filter_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/filter_pattern.rs @@ -1,14 +1,14 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::change_types::ChangeType; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::graph_patterns::filter_expression_rewrites::rewrite_filter_expression; use crate::preparing::graph_patterns::GPPrepReturn; use crate::query_context::{Context, PathEntry}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use spargebra::algebra::{Expression, GraphPattern}; use std::collections::HashMap; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_filter( &mut self, expression: &Expression, @@ -58,7 +58,7 @@ impl TimeSeriesQueryPrepper { return GPPrepReturn::fail_groupby_complex_query(); } if let Some(expr) = time_series_condition { - out_tsq_vec.push(TimeSeriesQuery::Filtered(Box::new(t), expr)); + out_tsq_vec.push(TimeseriesQuery::Filtered(Box::new(t), expr)); } else { out_tsq_vec.push(t); } diff --git a/chrontext/src/preparing/graph_patterns/graph_pattern.rs b/chrontext/src/preparing/graph_patterns/graph_pattern.rs index 69e215d..101fc5d 100644 --- a/chrontext/src/preparing/graph_patterns/graph_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/graph_pattern.rs @@ -1,4 +1,4 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use log::debug; use crate::combiner::solution_mapping::SolutionMappings; @@ -6,7 +6,7 @@ use crate::preparing::graph_patterns::GPPrepReturn; use crate::query_context::{Context, PathEntry}; use spargebra::algebra::GraphPattern; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_graph( &mut self, inner: &GraphPattern, diff --git a/chrontext/src/preparing/graph_patterns/group_pattern.rs b/chrontext/src/preparing/graph_patterns/group_pattern.rs index 33d73e9..0ea35eb 100644 --- a/chrontext/src/preparing/graph_patterns/group_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/group_pattern.rs @@ -2,13 +2,13 @@ use crate::query_context::{Context, PathEntry}; use log::debug; use std::collections::{HashMap, HashSet}; -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::constants::GROUPING_COL; use crate::find_query_variables::find_all_used_variables_in_aggregate_expression; use crate::preparing::graph_patterns::GPPrepReturn; use crate::pushdown_setting::PushdownSetting; -use crate::timeseries_query::{GroupedTimeSeriesQuery, TimeSeriesQuery}; +use crate::timeseries_query::{GroupedTimeseriesQuery, TimeseriesQuery}; use oxrdf::Variable; use polars::prelude::DataFrameJoinOps; use polars::prelude::IntoLazy; @@ -16,7 +16,7 @@ use polars_core::prelude::{JoinArgs, JoinType, UniqueKeepStrategy}; use polars_core::series::Series; use spargebra::algebra::{AggregateExpression, GraphPattern}; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_group( &mut self, graph_pattern: &GraphPattern, @@ -64,7 +64,7 @@ impl TimeSeriesQueryPrepper { } } //TODO: For OPC UA we must ensure that mapping df is 1:1 with identities, or alternatively group on these - tsq = TimeSeriesQuery::Grouped(GroupedTimeSeriesQuery { + tsq = TimeseriesQuery::Grouped(GroupedTimeseriesQuery { context: context.clone(), tsq: Box::new(tsq), by: keep_by, @@ -122,7 +122,7 @@ impl TimeSeriesQueryPrepper { } fn check_aggregations_are_in_scope( - tsq: &TimeSeriesQuery, + tsq: &TimeseriesQuery, context: &Context, aggregations: &Vec<(Variable, AggregateExpression)>, ) -> bool { @@ -144,12 +144,12 @@ fn check_aggregations_are_in_scope( } fn add_basic_groupby_mapping_values( - tsq: TimeSeriesQuery, + tsq: TimeseriesQuery, solution_mappings: &mut SolutionMappings, grouping_col: &str, -) -> TimeSeriesQuery { +) -> TimeseriesQuery { match tsq { - TimeSeriesQuery::Basic(b) => { + TimeseriesQuery::Basic(b) => { let by_vec = vec![ grouping_col, b.identifier_variable.as_ref().unwrap().as_str(), @@ -163,9 +163,9 @@ fn add_basic_groupby_mapping_values( .unwrap() .select(by_vec) .unwrap(); - TimeSeriesQuery::GroupedBasic(b, mapping_values, grouping_col.to_string()) + TimeseriesQuery::GroupedBasic(b, mapping_values, grouping_col.to_string()) } - TimeSeriesQuery::Filtered(tsq, f) => TimeSeriesQuery::Filtered( + TimeseriesQuery::Filtered(tsq, f) => TimeseriesQuery::Filtered( Box::new(add_basic_groupby_mapping_values( *tsq, solution_mappings, @@ -173,7 +173,7 @@ fn add_basic_groupby_mapping_values( )), f, ), - TimeSeriesQuery::InnerSynchronized(inners, syncs) => { + TimeseriesQuery::InnerSynchronized(inners, syncs) => { let mut tsq_added = vec![]; for tsq in inners { tsq_added.push(Box::new(add_basic_groupby_mapping_values( @@ -182,9 +182,9 @@ fn add_basic_groupby_mapping_values( grouping_col, ))) } - TimeSeriesQuery::InnerSynchronized(tsq_added, syncs) + TimeseriesQuery::InnerSynchronized(tsq_added, syncs) } - TimeSeriesQuery::ExpressionAs(tsq, v, e) => TimeSeriesQuery::ExpressionAs( + TimeseriesQuery::ExpressionAs(tsq, v, e) => TimeseriesQuery::ExpressionAs( Box::new(add_basic_groupby_mapping_values( *tsq, solution_mappings, @@ -193,10 +193,10 @@ fn add_basic_groupby_mapping_values( v, e, ), - TimeSeriesQuery::Grouped(_) => { + TimeseriesQuery::Grouped(_) => { panic!("Should never happen") } - TimeSeriesQuery::GroupedBasic(_, _, _) => { + TimeseriesQuery::GroupedBasic(_, _, _) => { panic!("Should never happen") } } diff --git a/chrontext/src/preparing/graph_patterns/join_pattern.rs b/chrontext/src/preparing/graph_patterns/join_pattern.rs index 540134f..78df48d 100644 --- a/chrontext/src/preparing/graph_patterns/join_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/join_pattern.rs @@ -1,11 +1,11 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::graph_patterns::GPPrepReturn; use crate::query_context::{Context, PathEntry}; use log::debug; use spargebra::algebra::GraphPattern; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_join( &mut self, left: &GraphPattern, diff --git a/chrontext/src/preparing/graph_patterns/left_join_pattern.rs b/chrontext/src/preparing/graph_patterns/left_join_pattern.rs index 4dcfdb3..03c9857 100644 --- a/chrontext/src/preparing/graph_patterns/left_join_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/left_join_pattern.rs @@ -1,11 +1,11 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::graph_patterns::GPPrepReturn; use crate::query_context::{Context, PathEntry}; use log::debug; use spargebra::algebra::{Expression, GraphPattern}; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_left_join( &mut self, left: &GraphPattern, diff --git a/chrontext/src/preparing/graph_patterns/minus_pattern.rs b/chrontext/src/preparing/graph_patterns/minus_pattern.rs index 12dccb5..9763fd0 100644 --- a/chrontext/src/preparing/graph_patterns/minus_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/minus_pattern.rs @@ -1,11 +1,11 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::graph_patterns::GPPrepReturn; use crate::query_context::{Context, PathEntry}; use log::debug; use spargebra::algebra::GraphPattern; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_minus( &mut self, left: &GraphPattern, diff --git a/chrontext/src/preparing/graph_patterns/order_by_pattern.rs b/chrontext/src/preparing/graph_patterns/order_by_pattern.rs index 8bdb7d4..50999d0 100644 --- a/chrontext/src/preparing/graph_patterns/order_by_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/order_by_pattern.rs @@ -1,4 +1,4 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::preparing::graph_patterns::GPPrepReturn; use crate::combiner::solution_mapping::SolutionMappings; @@ -6,7 +6,7 @@ use crate::query_context::{Context, PathEntry}; use log::debug; use spargebra::algebra::{GraphPattern, OrderExpression}; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_order_by( &mut self, inner: &GraphPattern, diff --git a/chrontext/src/preparing/graph_patterns/path_pattern.rs b/chrontext/src/preparing/graph_patterns/path_pattern.rs index a71c6e5..11d0bc1 100644 --- a/chrontext/src/preparing/graph_patterns/path_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/path_pattern.rs @@ -1,10 +1,10 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::preparing::graph_patterns::GPPrepReturn; use spargebra::algebra::PropertyPathExpression; use spargebra::term::TermPattern; use std::collections::HashMap; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { //We assume that all paths have been prepared so as to not contain any datapoint, timestamp, or data value. //These should have been split into ordinary triples. pub fn prepare_path( diff --git a/chrontext/src/preparing/graph_patterns/project_pattern.rs b/chrontext/src/preparing/graph_patterns/project_pattern.rs index 031d395..9c642a7 100644 --- a/chrontext/src/preparing/graph_patterns/project_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/project_pattern.rs @@ -1,4 +1,4 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::graph_patterns::GPPrepReturn; use crate::query_context::{Context, PathEntry}; @@ -6,7 +6,7 @@ use log::debug; use oxrdf::Variable; use spargebra::algebra::GraphPattern; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_project( &mut self, inner: &GraphPattern, diff --git a/chrontext/src/preparing/graph_patterns/reduced_pattern.rs b/chrontext/src/preparing/graph_patterns/reduced_pattern.rs index 1c7a547..8f30d9b 100644 --- a/chrontext/src/preparing/graph_patterns/reduced_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/reduced_pattern.rs @@ -1,11 +1,11 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::graph_patterns::GPPrepReturn; use crate::query_context::{Context, PathEntry}; use log::debug; use spargebra::algebra::GraphPattern; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_reduced( &mut self, inner: &GraphPattern, diff --git a/chrontext/src/preparing/graph_patterns/service_pattern.rs b/chrontext/src/preparing/graph_patterns/service_pattern.rs index 56aec4a..1153ee4 100644 --- a/chrontext/src/preparing/graph_patterns/service_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/service_pattern.rs @@ -1,9 +1,9 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use std::collections::HashMap; use crate::preparing::graph_patterns::GPPrepReturn; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_service(&mut self) -> GPPrepReturn { //Service pattern should not contain anything dynamic GPPrepReturn::new(HashMap::new()) diff --git a/chrontext/src/preparing/graph_patterns/sliced_pattern.rs b/chrontext/src/preparing/graph_patterns/sliced_pattern.rs index 77f8fd3..2c157ff 100644 --- a/chrontext/src/preparing/graph_patterns/sliced_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/sliced_pattern.rs @@ -1,11 +1,11 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::graph_patterns::GPPrepReturn; use crate::query_context::{Context, PathEntry}; use log::debug; use spargebra::algebra::GraphPattern; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_slice( &mut self, inner: &GraphPattern, diff --git a/chrontext/src/preparing/graph_patterns/union_pattern.rs b/chrontext/src/preparing/graph_patterns/union_pattern.rs index 61dcdc5..d264da8 100644 --- a/chrontext/src/preparing/graph_patterns/union_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/union_pattern.rs @@ -1,11 +1,11 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use crate::combiner::solution_mapping::SolutionMappings; use crate::preparing::graph_patterns::GPPrepReturn; use crate::query_context::{Context, PathEntry}; use log::debug; use spargebra::algebra::GraphPattern; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_union( &mut self, left: &GraphPattern, diff --git a/chrontext/src/preparing/graph_patterns/values_pattern.rs b/chrontext/src/preparing/graph_patterns/values_pattern.rs index b884b81..897a409 100644 --- a/chrontext/src/preparing/graph_patterns/values_pattern.rs +++ b/chrontext/src/preparing/graph_patterns/values_pattern.rs @@ -1,11 +1,11 @@ -use super::TimeSeriesQueryPrepper; +use super::TimeseriesQueryPrepper; use std::collections::HashMap; use crate::preparing::graph_patterns::GPPrepReturn; use oxrdf::Variable; use spargebra::term::GroundTerm; -impl TimeSeriesQueryPrepper { +impl TimeseriesQueryPrepper { pub fn prepare_values( &mut self, _variables: &Vec, diff --git a/chrontext/src/preparing/synchronization.rs b/chrontext/src/preparing/synchronization.rs index 3a984db..4b36948 100644 --- a/chrontext/src/preparing/synchronization.rs +++ b/chrontext/src/preparing/synchronization.rs @@ -1,10 +1,10 @@ -use crate::timeseries_query::{Synchronizer, TimeSeriesQuery}; +use crate::timeseries_query::{Synchronizer, TimeseriesQuery}; use oxrdf::Variable; use std::collections::HashSet; pub fn create_identity_synchronized_queries( - mut tsqs: Vec, -) -> Vec { + mut tsqs: Vec, +) -> Vec { let mut out_queries = vec![]; while tsqs.len() > 1 { let mut queries_to_synchronize = vec![]; @@ -34,7 +34,7 @@ pub fn create_identity_synchronized_queries( tsqs = keep_tsqs; if !queries_to_synchronize.is_empty() { queries_to_synchronize.push(Box::new(first_query)); - out_queries.push(TimeSeriesQuery::InnerSynchronized( + out_queries.push(TimeseriesQuery::InnerSynchronized( queries_to_synchronize, vec![Synchronizer::Identity( first_query_timestamp_variables_set diff --git a/chrontext/src/rewriting.rs b/chrontext/src/rewriting.rs index fb6d844..127238e 100644 --- a/chrontext/src/rewriting.rs +++ b/chrontext/src/rewriting.rs @@ -8,7 +8,7 @@ mod subqueries; use crate::constraints::{Constraint, VariableConstraints}; use crate::query_context::Context; use crate::rewriting::expressions::ExReturn; -use crate::timeseries_query::BasicTimeSeriesQuery; +use crate::timeseries_query::BasicTimeseriesQuery; use spargebra::algebra::Expression; use spargebra::term::Variable; use spargebra::Query; @@ -19,7 +19,7 @@ pub struct StaticQueryRewriter { variable_counter: u16, additional_projections: HashSet, variable_constraints: VariableConstraints, - basic_time_series_queries: Vec, + basic_time_series_queries: Vec, static_subqueries: HashMap, rewritten_filters: HashMap, is_hybrid: bool, @@ -43,7 +43,7 @@ impl StaticQueryRewriter { query: Query, ) -> ( HashMap, - Vec, + Vec, HashMap, ) { if !self.is_hybrid { diff --git a/chrontext/src/rewriting/graph_patterns/bgp_pattern.rs b/chrontext/src/rewriting/graph_patterns/bgp_pattern.rs index cdbed59..dde2189 100644 --- a/chrontext/src/rewriting/graph_patterns/bgp_pattern.rs +++ b/chrontext/src/rewriting/graph_patterns/bgp_pattern.rs @@ -5,7 +5,7 @@ use crate::constants::{ use crate::constraints::{Constraint, VariableConstraints}; use crate::query_context::{Context, PathEntry, VariableInContext}; use crate::rewriting::graph_patterns::GPReturn; -use crate::timeseries_query::BasicTimeSeriesQuery; +use crate::timeseries_query::BasicTimeseriesQuery; use oxrdf::{NamedNode, Variable}; use spargebra::algebra::GraphPattern; use spargebra::term::{NamedNodePattern, TermPattern, TriplePattern}; @@ -177,8 +177,8 @@ impl StaticQueryRewriter { datatype_variable: &Variable, resource_variable: &Variable, context: &Context, - ) -> BasicTimeSeriesQuery { - let mut ts_query = BasicTimeSeriesQuery::new_empty(); + ) -> BasicTimeseriesQuery { + let mut ts_query = BasicTimeseriesQuery::new_empty(); ts_query.identifier_variable = Some(time_series_id_variable.clone()); ts_query.datatype_variable = Some(datatype_variable.clone()); ts_query.resource_variable = Some(resource_variable.clone()); @@ -191,7 +191,7 @@ impl StaticQueryRewriter { } fn process_dynamic_triples( - local_basic_tsqs: &mut Vec, + local_basic_tsqs: &mut Vec, dynamic_triples: Vec<&TriplePattern>, context: &Context, ) { diff --git a/chrontext/src/timeseries_database.rs b/chrontext/src/timeseries_database.rs index 46dae87..34c9161 100644 --- a/chrontext/src/timeseries_database.rs +++ b/chrontext/src/timeseries_database.rs @@ -5,9 +5,9 @@ pub mod simple_in_memory_timeseries; pub mod timeseries_sql_rewrite; use crate::timeseries_database::timeseries_sql_rewrite::{ - TimeSeriesQueryToSQLError, TimeSeriesQueryToSQLTransformer, TimeSeriesTable, + TimeseriesQueryToSQLError, TimeseriesQueryToSQLTransformer, TimeseriesTable, }; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_query::TimeseriesQuery; use async_trait::async_trait; use log::debug; use polars::frame::DataFrame; @@ -15,8 +15,8 @@ use sea_query::{BigQueryQueryBuilder, PostgresQueryBuilder}; use std::error::Error; #[async_trait] -pub trait TimeSeriesQueryable: Send { - async fn execute(&mut self, tsq: &TimeSeriesQuery) -> Result>; +pub trait TimeseriesQueryable: Send { + async fn execute(&mut self, tsq: &TimeseriesQuery) -> Result>; fn allow_compound_timeseries_queries(&self) -> bool; } @@ -26,15 +26,15 @@ pub enum DatabaseType { Dremio, } -pub trait TimeSeriesSQLQueryable { +pub trait TimeseriesSQLQueryable { fn get_sql_string( &self, - tsq: &TimeSeriesQuery, + tsq: &TimeseriesQuery, database_type: DatabaseType, - ) -> Result { + ) -> Result { let query_string; { - let transformer = TimeSeriesQueryToSQLTransformer::new( + let transformer = TimeseriesQueryToSQLTransformer::new( &self.get_time_series_tables(), database_type.clone(), ); @@ -49,5 +49,5 @@ pub trait TimeSeriesSQLQueryable { Ok(query_string) } - fn get_time_series_tables(&self) -> &Vec; + fn get_time_series_tables(&self) -> &Vec; } diff --git a/chrontext/src/timeseries_database/arrow_flight_sql_database.rs b/chrontext/src/timeseries_database/arrow_flight_sql_database.rs index 60b0216..fdb6ae8 100644 --- a/chrontext/src/timeseries_database/arrow_flight_sql_database.rs +++ b/chrontext/src/timeseries_database/arrow_flight_sql_database.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::timeseries_database::{DatabaseType, TimeSeriesQueryable, TimeSeriesSQLQueryable}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_database::{DatabaseType, TimeseriesQueryable, TimeseriesSQLQueryable}; +use crate::timeseries_query::TimeseriesQuery; use arrow2::io::flight as flight2; use arrow_format::flight::data::{FlightDescriptor, FlightInfo, HandshakeRequest}; use async_trait::async_trait; @@ -22,7 +22,7 @@ use polars::frame::DataFrame; use polars_core::utils::accumulate_dataframes_vertical; use crate::timeseries_database::timeseries_sql_rewrite::{ - TimeSeriesQueryToSQLError, TimeSeriesTable, + TimeseriesQueryToSQLError, TimeseriesTable, }; use arrow_format::flight::service::flight_service_client::FlightServiceClient; use arrow_format::ipc::planus::ReadAsRoot; @@ -44,7 +44,7 @@ use tonic::{IntoRequest, Request, Response, Status}; pub enum ArrowFlightSQLError { TonicStatus(#[from] Status), TransportError(#[from] tonic::transport::Error), - TranslationError(#[from] TimeSeriesQueryToSQLError), + TranslationError(#[from] TimeseriesQueryToSQLError), ArrowError(#[from] ArrowError), PolarsError(#[from] PolarsError), } @@ -77,7 +77,7 @@ pub struct ArrowFlightSQLDatabase { password: String, token: Option, cookies: Option>, - time_series_tables: Vec, + time_series_tables: Vec, } impl ArrowFlightSQLDatabase { @@ -85,7 +85,7 @@ impl ArrowFlightSQLDatabase { endpoint: &str, username: &str, password: &str, - time_series_tables: Vec, + time_series_tables: Vec, ) -> Result { let mut db = ArrowFlightSQLDatabase { endpoint: endpoint.into(), @@ -221,8 +221,8 @@ impl ArrowFlightSQLDatabase { } #[async_trait] -impl TimeSeriesQueryable for ArrowFlightSQLDatabase { - async fn execute(&mut self, tsq: &TimeSeriesQuery) -> Result> { +impl TimeseriesQueryable for ArrowFlightSQLDatabase { + async fn execute(&mut self, tsq: &TimeseriesQuery) -> Result> { let query_string = self.get_sql_string(tsq, DatabaseType::Dremio)?; Ok(self.execute_sql_query(query_string).await?) } @@ -232,8 +232,8 @@ impl TimeSeriesQueryable for ArrowFlightSQLDatabase { } } -impl TimeSeriesSQLQueryable for ArrowFlightSQLDatabase { - fn get_time_series_tables(&self) -> &Vec { +impl TimeseriesSQLQueryable for ArrowFlightSQLDatabase { + fn get_time_series_tables(&self) -> &Vec { &self.time_series_tables } } diff --git a/chrontext/src/timeseries_database/bigquery_database.rs b/chrontext/src/timeseries_database/bigquery_database.rs index 2a60618..9d6caa1 100644 --- a/chrontext/src/timeseries_database/bigquery_database.rs +++ b/chrontext/src/timeseries_database/bigquery_database.rs @@ -1,8 +1,8 @@ use crate::timeseries_database::timeseries_sql_rewrite::{ - TimeSeriesQueryToSQLError, TimeSeriesTable, + TimeseriesQueryToSQLError, TimeseriesTable, }; -use crate::timeseries_database::{DatabaseType, TimeSeriesQueryable, TimeSeriesSQLQueryable}; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_database::{DatabaseType, TimeseriesQueryable, TimeseriesSQLQueryable}; +use crate::timeseries_query::TimeseriesQuery; use async_trait::async_trait; use bigquery_polars::{BigQueryExecutor, Client}; use polars::prelude::PolarsError; @@ -18,7 +18,7 @@ use tonic::Status; pub enum BigQueryError { TonicStatus(#[from] Status), TransportError(#[from] tonic::transport::Error), - TranslationError(#[from] TimeSeriesQueryToSQLError), + TranslationError(#[from] TimeseriesQueryToSQLError), ArrowError(#[from] ArrowError), PolarsError(#[from] PolarsError), } @@ -46,11 +46,11 @@ impl Display for BigQueryError { } pub struct BigQueryDatabase { gcp_sa_key: String, - time_series_tables: Vec, + time_series_tables: Vec, } impl BigQueryDatabase { - pub fn new(gcp_sa_key: String, time_series_tables: Vec) -> BigQueryDatabase { + pub fn new(gcp_sa_key: String, time_series_tables: Vec) -> BigQueryDatabase { BigQueryDatabase { gcp_sa_key, time_series_tables, @@ -59,8 +59,8 @@ impl BigQueryDatabase { } #[async_trait] -impl TimeSeriesQueryable for BigQueryDatabase { - async fn execute(&mut self, tsq: &TimeSeriesQuery) -> Result> { +impl TimeseriesQueryable for BigQueryDatabase { + async fn execute(&mut self, tsq: &TimeseriesQuery) -> Result> { let query_string = self.get_sql_string(tsq, DatabaseType::BigQuery)?; // The following code is based on https://github.com/DataTreehouse/connector-x/blob/main/connectorx/src/sources/bigquery/mod.rs @@ -113,8 +113,8 @@ impl TimeSeriesQueryable for BigQueryDatabase { } } -impl TimeSeriesSQLQueryable for BigQueryDatabase { - fn get_time_series_tables(&self) -> &Vec { +impl TimeseriesSQLQueryable for BigQueryDatabase { + fn get_time_series_tables(&self) -> &Vec { &self.time_series_tables } } diff --git a/chrontext/src/timeseries_database/opcua_history_read.rs b/chrontext/src/timeseries_database/opcua_history_read.rs index 7cd84ec..92185c5 100644 --- a/chrontext/src/timeseries_database/opcua_history_read.rs +++ b/chrontext/src/timeseries_database/opcua_history_read.rs @@ -1,7 +1,7 @@ use crate::constants::DATETIME_AS_SECONDS; use crate::query_context::Context; -use crate::timeseries_database::TimeSeriesQueryable; -use crate::timeseries_query::TimeSeriesQuery; +use crate::timeseries_database::TimeseriesQueryable; +use crate::timeseries_query::TimeseriesQuery; use async_trait::async_trait; use opcua::client::prelude::{ AggregateConfiguration, AttributeService, ByteString, Client, ClientBuilder, DateTime, @@ -41,7 +41,7 @@ pub struct OPCUAHistoryRead { #[derive(Debug)] pub enum OPCUAHistoryReadError { InvalidNodeIdError(String), - TimeSeriesQueryTypeNotSupported, + TimeseriesQueryTypeNotSupported, } impl Display for OPCUAHistoryReadError { @@ -50,7 +50,7 @@ impl Display for OPCUAHistoryReadError { OPCUAHistoryReadError::InvalidNodeIdError(s) => { write!(f, "Invalid NodeId {}", s) } - OPCUAHistoryReadError::TimeSeriesQueryTypeNotSupported => { + OPCUAHistoryReadError::TimeseriesQueryTypeNotSupported => { write!(f, "Only grouped and basic query types are supported") } } @@ -92,8 +92,8 @@ impl OPCUAHistoryRead { } #[async_trait] -impl TimeSeriesQueryable for OPCUAHistoryRead { - async fn execute(&mut self, tsq: &TimeSeriesQuery) -> Result> { +impl TimeseriesQueryable for OPCUAHistoryRead { + async fn execute(&mut self, tsq: &TimeseriesQuery) -> Result> { validate_tsq(tsq, true, false)?; let session = self.session.write(); let start_time = find_time(tsq, &FindTime::Start); @@ -106,7 +106,7 @@ impl TimeSeriesQueryable for OPCUAHistoryRead { let mut colnames_identifiers = vec![]; let mut grouping_col_lookup = HashMap::new(); let mut grouping_col_name = None; - if let TimeSeriesQuery::Grouped(grouped) = tsq { + if let TimeseriesQuery::Grouped(grouped) = tsq { let (colname, processed_details_some) = create_read_processed_details(tsq, start_time, end_time, &grouped.context); processed_details = Some(processed_details_some); @@ -285,31 +285,31 @@ impl TimeSeriesQueryable for OPCUAHistoryRead { } fn validate_tsq( - tsq: &TimeSeriesQuery, + tsq: &TimeseriesQuery, toplevel: bool, inside_grouping: bool, ) -> Result<(), OPCUAHistoryReadError> { match tsq { - TimeSeriesQuery::Basic(_) => Ok(()), - TimeSeriesQuery::Filtered(f, _) => validate_tsq(f, false, inside_grouping), - TimeSeriesQuery::Grouped(g) => { + TimeseriesQuery::Basic(_) => Ok(()), + TimeseriesQuery::Filtered(f, _) => validate_tsq(f, false, inside_grouping), + TimeseriesQuery::Grouped(g) => { if !toplevel { - Err(OPCUAHistoryReadError::TimeSeriesQueryTypeNotSupported) + Err(OPCUAHistoryReadError::TimeseriesQueryTypeNotSupported) } else { validate_tsq(&g.tsq, false, true) } } - TimeSeriesQuery::GroupedBasic(_, _, _) => { + TimeseriesQuery::GroupedBasic(_, _, _) => { if inside_grouping { Ok(()) } else { - Err(OPCUAHistoryReadError::TimeSeriesQueryTypeNotSupported) + Err(OPCUAHistoryReadError::TimeseriesQueryTypeNotSupported) } } - TimeSeriesQuery::InnerSynchronized(_, _) => { - Err(OPCUAHistoryReadError::TimeSeriesQueryTypeNotSupported) + TimeseriesQuery::InnerSynchronized(_, _) => { + Err(OPCUAHistoryReadError::TimeseriesQueryTypeNotSupported) } - TimeSeriesQuery::ExpressionAs(t, _, _) => validate_tsq(t, false, inside_grouping), + TimeseriesQuery::ExpressionAs(t, _, _) => validate_tsq(t, false, inside_grouping), } } @@ -324,7 +324,7 @@ fn create_raw_details(start_time: DateTime, end_time: DateTime) -> ReadRawModifi } fn create_read_processed_details( - tsq: &TimeSeriesQuery, + tsq: &TimeseriesQuery, start_time: DateTime, end_time: DateTime, context: &Context, @@ -381,8 +381,8 @@ fn history_data_to_series_tuple(hd: HistoryData) -> (Series, Series) { (timestamps, values) } -fn find_aggregate_types(tsq: &TimeSeriesQuery) -> Option> { - if let TimeSeriesQuery::Grouped(grouped) = tsq { +fn find_aggregate_types(tsq: &TimeseriesQuery) -> Option> { + if let TimeseriesQuery::Grouped(grouped) = tsq { let mut nodes = vec![]; for (_, agg) in &grouped.aggregations { let value_var_str = tsq.get_value_variables().get(0).unwrap().variable.as_str(); @@ -457,15 +457,15 @@ enum FindTime { End, } -fn find_time(tsq: &TimeSeriesQuery, find_time: &FindTime) -> DateTime { +fn find_time(tsq: &TimeseriesQuery, find_time: &FindTime) -> DateTime { let mut found_time = None; - let filter = if let TimeSeriesQuery::Grouped(gr) = tsq { - if let TimeSeriesQuery::Filtered(_, filter) = gr.tsq.as_ref() { + let filter = if let TimeseriesQuery::Grouped(gr) = tsq { + if let TimeseriesQuery::Filtered(_, filter) = gr.tsq.as_ref() { Some(filter) } else { None } - } else if let TimeSeriesQuery::Filtered(_, filter) = tsq { + } else if let TimeseriesQuery::Filtered(_, filter) = tsq { Some(filter) } else { None @@ -686,8 +686,8 @@ fn datetime_from_expression( } } -fn find_grouping_interval(tsq: &TimeSeriesQuery, context: &Context) -> Option<(String, f64)> { - if let TimeSeriesQuery::Grouped(grouped) = tsq { +fn find_grouping_interval(tsq: &TimeseriesQuery, context: &Context) -> Option<(String, f64)> { + if let TimeseriesQuery::Grouped(grouped) = tsq { let mut tsf = None; let mut grvar = None; for v in &grouped.by { diff --git a/chrontext/src/timeseries_database/simple_in_memory_timeseries.rs b/chrontext/src/timeseries_database/simple_in_memory_timeseries.rs index fc579a0..0e07a6b 100644 --- a/chrontext/src/timeseries_database/simple_in_memory_timeseries.rs +++ b/chrontext/src/timeseries_database/simple_in_memory_timeseries.rs @@ -4,9 +4,9 @@ use crate::constants::GROUPING_COL; use crate::pushdown_setting::all_pushdowns; use crate::query_context::{Context, PathEntry}; use crate::sparql_database::sparql_endpoint::SparqlEndpoint; -use crate::timeseries_database::TimeSeriesQueryable; +use crate::timeseries_database::TimeseriesQueryable; use crate::timeseries_query::{ - BasicTimeSeriesQuery, GroupedTimeSeriesQuery, Synchronizer, TimeSeriesQuery, + BasicTimeseriesQuery, GroupedTimeseriesQuery, Synchronizer, TimeseriesQuery, }; use async_recursion::async_recursion; use async_trait::async_trait; @@ -23,8 +23,8 @@ pub struct InMemoryTimeseriesDatabase { } #[async_trait] -impl TimeSeriesQueryable for InMemoryTimeseriesDatabase { - async fn execute(&mut self, tsq: &TimeSeriesQuery) -> Result> { +impl TimeseriesQueryable for InMemoryTimeseriesDatabase { + async fn execute(&mut self, tsq: &TimeseriesQuery) -> Result> { self.execute_query(tsq).await } @@ -35,15 +35,15 @@ impl TimeSeriesQueryable for InMemoryTimeseriesDatabase { impl InMemoryTimeseriesDatabase { #[async_recursion] - async fn execute_query(&self, tsq: &TimeSeriesQuery) -> Result> { + async fn execute_query(&self, tsq: &TimeseriesQuery) -> Result> { match tsq { - TimeSeriesQuery::Basic(b) => self.execute_basic(b), - TimeSeriesQuery::Filtered(inner, filter) => self.execute_filtered(inner, filter).await, - TimeSeriesQuery::InnerSynchronized(inners, synchronizers) => { + TimeseriesQuery::Basic(b) => self.execute_basic(b), + TimeseriesQuery::Filtered(inner, filter) => self.execute_filtered(inner, filter).await, + TimeseriesQuery::InnerSynchronized(inners, synchronizers) => { self.execute_inner_synchronized(inners, synchronizers).await } - TimeSeriesQuery::Grouped(grouped) => self.execute_grouped(grouped).await, - TimeSeriesQuery::GroupedBasic(btsq, df, ..) => { + TimeseriesQuery::Grouped(grouped) => self.execute_grouped(grouped).await, + TimeseriesQuery::GroupedBasic(btsq, df, ..) => { let mut basic_df = self.execute_basic(btsq)?; basic_df = basic_df .join( @@ -58,7 +58,7 @@ impl InMemoryTimeseriesDatabase { .unwrap(); Ok(basic_df) } - TimeSeriesQuery::ExpressionAs(tsq, v, e) => { + TimeseriesQuery::ExpressionAs(tsq, v, e) => { let mut df = self.execute_query(tsq).await?; let tmp_context = Context::from_path(vec![PathEntry::Coalesce(13)]); let columns = df @@ -88,7 +88,7 @@ impl InMemoryTimeseriesDatabase { } } - fn execute_basic(&self, btsq: &BasicTimeSeriesQuery) -> Result> { + fn execute_basic(&self, btsq: &BasicTimeseriesQuery) -> Result> { let mut lfs = vec![]; for id in btsq.ids.as_ref().unwrap() { if let Some(df) = self.frames.get(id) { @@ -124,7 +124,7 @@ impl InMemoryTimeseriesDatabase { #[async_recursion] async fn execute_filtered( &self, - tsq: &TimeSeriesQuery, + tsq: &TimeseriesQuery, filter: &Expression, ) -> Result> { let df = self.execute_query(tsq).await?; @@ -158,7 +158,7 @@ impl InMemoryTimeseriesDatabase { async fn execute_grouped( &self, - grouped: &GroupedTimeSeriesQuery, + grouped: &GroupedTimeseriesQuery, ) -> Result> { let df = self.execute_query(&grouped.tsq).await?; let columns = df @@ -216,7 +216,7 @@ impl InMemoryTimeseriesDatabase { async fn execute_inner_synchronized( &self, - inners: &Vec>, + inners: &Vec>, synchronizers: &Vec, ) -> Result> { assert_eq!(synchronizers.len(), 1); diff --git a/chrontext/src/timeseries_database/timeseries_sql_rewrite.rs b/chrontext/src/timeseries_database/timeseries_sql_rewrite.rs index 91e7c34..d5d5841 100644 --- a/chrontext/src/timeseries_database/timeseries_sql_rewrite.rs +++ b/chrontext/src/timeseries_database/timeseries_sql_rewrite.rs @@ -4,7 +4,7 @@ mod partitioning_support; use crate::timeseries_database::timeseries_sql_rewrite::expression_rewrite::SPARQLToSQLExpressionTransformer; use crate::timeseries_database::timeseries_sql_rewrite::partitioning_support::add_partitioned_timestamp_conditions; use crate::timeseries_database::DatabaseType; -use crate::timeseries_query::{BasicTimeSeriesQuery, Synchronizer, TimeSeriesQuery}; +use crate::timeseries_query::{BasicTimeseriesQuery, Synchronizer, TimeseriesQuery}; use oxrdf::{NamedNode, Variable}; use polars_core::datatypes::AnyValue; use polars_core::frame::DataFrame; @@ -24,7 +24,7 @@ const MONTH_PARTITION_COLUMN_NAME: &str = "month_partition_column_name"; const DAY_PARTITION_COLUMN_NAME: &str = "day_partition_column_name"; #[derive(Debug)] -pub enum TimeSeriesQueryToSQLError { +pub enum TimeseriesQueryToSQLError { UnknownVariable(String), UnknownDatatype(String), FoundNonValueInInExpression, @@ -33,25 +33,25 @@ pub enum TimeSeriesQueryToSQLError { TimeseriesResourceNotFound(String, Vec), } -impl Display for TimeSeriesQueryToSQLError { +impl Display for TimeseriesQueryToSQLError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - TimeSeriesQueryToSQLError::UnknownVariable(v) => { + TimeseriesQueryToSQLError::UnknownVariable(v) => { write!(f, "Unknown variable {}", v) } - TimeSeriesQueryToSQLError::UnknownDatatype(d) => { + TimeseriesQueryToSQLError::UnknownDatatype(d) => { write!(f, "Unknown datatype: {}", d) } - TimeSeriesQueryToSQLError::FoundNonValueInInExpression => { + TimeseriesQueryToSQLError::FoundNonValueInInExpression => { write!(f, "In-expression contained non-literal alternative") } - TimeSeriesQueryToSQLError::DatatypeNotSupported(dt) => { + TimeseriesQueryToSQLError::DatatypeNotSupported(dt) => { write!(f, "Datatype not supported: {}", dt) } - TimeSeriesQueryToSQLError::MissingTimeseriesResource => { + TimeseriesQueryToSQLError::MissingTimeseriesResource => { write!(f, "Timeseries value resource name missing") } - TimeSeriesQueryToSQLError::TimeseriesResourceNotFound(resource, alternatives) => { + TimeseriesQueryToSQLError::TimeseriesResourceNotFound(resource, alternatives) => { write!( f, "Timeseries resource {} not found among alternatives {}", @@ -63,7 +63,7 @@ impl Display for TimeSeriesQueryToSQLError { } } -impl Error for TimeSeriesQueryToSQLError {} +impl Error for TimeseriesQueryToSQLError {} #[derive(Clone)] pub(crate) enum Name { @@ -98,7 +98,7 @@ impl Iden for Name { } #[derive(Clone)] -pub struct TimeSeriesTable { +pub struct TimeseriesTable { // Used to identify the table of the time series value pub resource_name: String, pub schema: Option, @@ -112,18 +112,18 @@ pub struct TimeSeriesTable { pub day_column: Option, } -pub struct TimeSeriesQueryToSQLTransformer<'a> { +pub struct TimeseriesQueryToSQLTransformer<'a> { pub partition_support: bool, - pub tables: &'a Vec, + pub tables: &'a Vec, pub database_type: DatabaseType, } -impl TimeSeriesQueryToSQLTransformer<'_> { +impl TimeseriesQueryToSQLTransformer<'_> { pub fn new( - tables: &Vec, + tables: &Vec, database_type: DatabaseType, - ) -> TimeSeriesQueryToSQLTransformer { - TimeSeriesQueryToSQLTransformer { + ) -> TimeseriesQueryToSQLTransformer { + TimeseriesQueryToSQLTransformer { partition_support: check_partitioning_support(tables), tables, database_type, @@ -132,9 +132,9 @@ impl TimeSeriesQueryToSQLTransformer<'_> { pub fn create_query( &self, - tsq: &TimeSeriesQuery, + tsq: &TimeseriesQuery, project_date_partition: bool, - ) -> Result<(SelectStatement, HashSet), TimeSeriesQueryToSQLError> { + ) -> Result<(SelectStatement, HashSet), TimeseriesQueryToSQLError> { let (mut select_statement, map) = self.create_query_nested(tsq, project_date_partition)?; let sort_col; if let Some(grcol) = tsq.get_groupby_column() { @@ -154,12 +154,12 @@ impl TimeSeriesQueryToSQLTransformer<'_> { pub fn create_query_nested( &self, - tsq: &TimeSeriesQuery, + tsq: &TimeseriesQuery, project_date_partition: bool, - ) -> Result<(SelectStatement, HashSet), TimeSeriesQueryToSQLError> { + ) -> Result<(SelectStatement, HashSet), TimeseriesQueryToSQLError> { match tsq { - TimeSeriesQuery::Basic(b) => self.create_basic_select(b, project_date_partition), - TimeSeriesQuery::Filtered(tsq, filter) => { + TimeseriesQuery::Basic(b) => self.create_basic_select(b, project_date_partition), + TimeseriesQuery::Filtered(tsq, filter) => { let (se, need_partition_columns) = self.create_filter_expressions( filter, Some( @@ -175,7 +175,7 @@ impl TimeSeriesQueryToSQLTransformer<'_> { let (select, mut columns) = self .create_query_nested(tsq, need_partition_columns || project_date_partition)?; - let wraps_inner = if let TimeSeriesQuery::Basic(_) = **tsq { + let wraps_inner = if let TimeseriesQuery::Basic(_) = **tsq { true } else { false @@ -206,7 +206,7 @@ impl TimeSeriesQueryToSQLTransformer<'_> { Ok((use_select, columns)) } - TimeSeriesQuery::InnerSynchronized(inner, synchronizers) => { + TimeseriesQuery::InnerSynchronized(inner, synchronizers) => { if synchronizers.iter().all(|x| { #[allow(irrefutable_let_patterns)] if let Synchronizer::Identity(_) = x { @@ -229,16 +229,16 @@ impl TimeSeriesQueryToSQLTransformer<'_> { todo!("Not implemented yet") } } - TimeSeriesQuery::Grouped(grouped) => self.create_grouped_query( + TimeseriesQuery::Grouped(grouped) => self.create_grouped_query( &grouped.tsq, &grouped.by, &grouped.aggregations, project_date_partition, ), - TimeSeriesQuery::GroupedBasic(btsq, df, col) => { + TimeseriesQuery::GroupedBasic(btsq, df, col) => { self.create_grouped_basic(btsq, project_date_partition, df, col) } - TimeSeriesQuery::ExpressionAs(tsq, v, e) => { + TimeseriesQuery::ExpressionAs(tsq, v, e) => { self.create_expression_as(tsq, project_date_partition, v, e) } } @@ -246,11 +246,11 @@ impl TimeSeriesQueryToSQLTransformer<'_> { fn create_expression_as( &self, - tsq: &TimeSeriesQuery, + tsq: &TimeseriesQuery, project_date_partition: bool, v: &Variable, e: &Expression, - ) -> Result<(SelectStatement, HashSet), TimeSeriesQueryToSQLError> { + ) -> Result<(SelectStatement, HashSet), TimeseriesQueryToSQLError> { let subquery_alias = "subquery"; let subquery_name = Name::Table(subquery_alias.to_string()); let mut expr_transformer = @@ -290,11 +290,11 @@ impl TimeSeriesQueryToSQLTransformer<'_> { fn create_grouped_basic( &self, - btsq: &BasicTimeSeriesQuery, + btsq: &BasicTimeseriesQuery, project_date_partition: bool, df: &DataFrame, column_name: &String, - ) -> Result<(SelectStatement, HashSet), TimeSeriesQueryToSQLError> { + ) -> Result<(SelectStatement, HashSet), TimeseriesQueryToSQLError> { let mut value_tuples = vec![]; let identifier_colname = btsq.identifier_variable.as_ref().unwrap().as_str(); let mut identifier_iter = df.column(identifier_colname).unwrap().iter(); @@ -413,9 +413,9 @@ impl TimeSeriesQueryToSQLTransformer<'_> { fn create_basic_select( &self, - btsq: &BasicTimeSeriesQuery, + btsq: &BasicTimeseriesQuery, project_date_partition: bool, - ) -> Result<(SelectStatement, HashSet), TimeSeriesQueryToSQLError> { + ) -> Result<(SelectStatement, HashSet), TimeseriesQueryToSQLError> { let table = self.find_right_table(btsq)?; let (select, columns) = table.create_basic_query(btsq, project_date_partition)?; @@ -498,8 +498,8 @@ impl TimeSeriesQueryToSQLTransformer<'_> { fn find_right_table<'a>( &'a self, - btsq: &BasicTimeSeriesQuery, - ) -> Result<&'a TimeSeriesTable, TimeSeriesQueryToSQLError> { + btsq: &BasicTimeseriesQuery, + ) -> Result<&'a TimeseriesTable, TimeseriesQueryToSQLError> { if let Some(resource) = &btsq.resource { for table in self.tables { if &table.resource_name == resource { @@ -511,12 +511,12 @@ impl TimeSeriesQueryToSQLTransformer<'_> { .iter() .map(|x| x.resource_name.clone()) .collect(); - Err(TimeSeriesQueryToSQLError::TimeseriesResourceNotFound( + Err(TimeseriesQueryToSQLError::TimeseriesResourceNotFound( resource.clone(), alternatives, )) } else { - Err(TimeSeriesQueryToSQLError::MissingTimeseriesResource) + Err(TimeseriesQueryToSQLError::MissingTimeseriesResource) } } @@ -524,7 +524,7 @@ impl TimeSeriesQueryToSQLTransformer<'_> { &self, expression: &Expression, timestamp_column: Option<&String>, - ) -> Result<(SimpleExpr, bool), TimeSeriesQueryToSQLError> { + ) -> Result<(SimpleExpr, bool), TimeseriesQueryToSQLError> { let mut transformer = self.create_transformer(None, self.database_type.clone()); let mut se = transformer.sparql_expression_to_sql_expression(expression)?; let mut partitioned = false; @@ -544,11 +544,11 @@ impl TimeSeriesQueryToSQLTransformer<'_> { fn create_grouped_query( &self, - inner_tsq: &TimeSeriesQuery, + inner_tsq: &TimeseriesQuery, by: &Vec, aggregations: &Vec<(Variable, AggregateExpression)>, project_date_partition: bool, - ) -> Result<(SelectStatement, HashSet), TimeSeriesQueryToSQLError> { + ) -> Result<(SelectStatement, HashSet), TimeseriesQueryToSQLError> { //Inner query timeseries functions: let inner_query_str = "inner_query"; let inner_query_name = Name::Table(inner_query_str.to_string()); @@ -639,12 +639,12 @@ impl TimeSeriesQueryToSQLTransformer<'_> { } } -impl TimeSeriesTable { +impl TimeseriesTable { pub fn create_basic_query( &self, - btsq: &BasicTimeSeriesQuery, + btsq: &BasicTimeseriesQuery, project_date_partition: bool, - ) -> Result<(SelectStatement, HashSet), TimeSeriesQueryToSQLError> { + ) -> Result<(SelectStatement, HashSet), TimeseriesQueryToSQLError> { let mut basic_query = Query::select(); let mut variable_column_name_map = HashMap::new(); variable_column_name_map.insert( @@ -729,7 +729,7 @@ impl TimeSeriesTable { } } -fn check_partitioning_support(tables: &Vec) -> bool { +fn check_partitioning_support(tables: &Vec) -> bool { tables .iter() .all(|x| x.day_column.is_some() && x.month_column.is_some() && x.day_column.is_some()) @@ -739,11 +739,11 @@ fn check_partitioning_support(tables: &Vec) -> bool { mod tests { use crate::query_context::{Context, VariableInContext}; use crate::timeseries_database::timeseries_sql_rewrite::{ - TimeSeriesQueryToSQLTransformer, TimeSeriesTable, + TimeseriesQueryToSQLTransformer, TimeseriesTable, }; use crate::timeseries_database::DatabaseType; use crate::timeseries_query::{ - BasicTimeSeriesQuery, GroupedTimeSeriesQuery, Synchronizer, TimeSeriesQuery, + BasicTimeseriesQuery, GroupedTimeseriesQuery, Synchronizer, TimeseriesQuery, }; use oxrdf::vocab::xsd; use oxrdf::{Literal, NamedNode, Variable}; @@ -756,7 +756,7 @@ mod tests { #[test] pub fn test_translate() { - let basic_tsq = BasicTimeSeriesQuery { + let basic_tsq = BasicTimeseriesQuery { identifier_variable: Some(Variable::new_unchecked("id")), timeseries_variable: Some(VariableInContext::new( Variable::new_unchecked("ts"), @@ -780,8 +780,8 @@ mod tests { )), ids: Some(vec!["A".to_string(), "B".to_string()]), }; - let tsq = TimeSeriesQuery::Filtered( - Box::new(TimeSeriesQuery::Basic(basic_tsq)), + let tsq = TimeseriesQuery::Filtered( + Box::new(TimeseriesQuery::Basic(basic_tsq)), Expression::LessOrEqual( Box::new(Expression::Variable(Variable::new_unchecked("t"))), Box::new(Expression::Literal(Literal::new_typed_literal( @@ -791,7 +791,7 @@ mod tests { ), ); - let table = TimeSeriesTable { + let table = TimeseriesTable { resource_name: "my_resource".into(), schema: Some("s3.ct-benchmark".into()), time_series_table: "timeseries_double".into(), @@ -804,7 +804,7 @@ mod tests { day_column: Some("dir2".to_string()), }; let tables = vec![table]; - let transformer = TimeSeriesQueryToSQLTransformer::new(&tables, DatabaseType::Dremio); + let transformer = TimeseriesQueryToSQLTransformer::new(&tables, DatabaseType::Dremio); let (sql_query, _) = transformer.create_query(&tsq, false).unwrap(); assert_eq!( &sql_query.to_string(PostgresQueryBuilder), @@ -814,17 +814,17 @@ mod tests { #[test] fn test_synchronized_grouped() { - let tsq = TimeSeriesQuery::Grouped(GroupedTimeSeriesQuery { - tsq: Box::new(TimeSeriesQuery::ExpressionAs( - Box::new(TimeSeriesQuery::ExpressionAs( - Box::new(TimeSeriesQuery::ExpressionAs( - Box::new(TimeSeriesQuery::ExpressionAs( - Box::new(TimeSeriesQuery::ExpressionAs( - Box::new(TimeSeriesQuery::Filtered( - Box::new(TimeSeriesQuery::InnerSynchronized( + let tsq = TimeseriesQuery::Grouped(GroupedTimeseriesQuery { + tsq: Box::new(TimeseriesQuery::ExpressionAs( + Box::new(TimeseriesQuery::ExpressionAs( + Box::new(TimeseriesQuery::ExpressionAs( + Box::new(TimeseriesQuery::ExpressionAs( + Box::new(TimeseriesQuery::ExpressionAs( + Box::new(TimeseriesQuery::Filtered( + Box::new(TimeseriesQuery::InnerSynchronized( vec![ - Box::new(TimeSeriesQuery::GroupedBasic( - BasicTimeSeriesQuery { + Box::new(TimeseriesQuery::GroupedBasic( + BasicTimeseriesQuery { identifier_variable: Some( Variable::new_unchecked("ts_external_id_1"), ), @@ -867,8 +867,8 @@ mod tests { .unwrap(), "grouping_col_0".to_string(), )), - Box::new(TimeSeriesQuery::GroupedBasic( - BasicTimeSeriesQuery { + Box::new(TimeseriesQuery::GroupedBasic( + BasicTimeseriesQuery { identifier_variable: Some( Variable::new_unchecked("ts_external_id_2"), ), @@ -1010,7 +1010,7 @@ mod tests { ], }); - let table = TimeSeriesTable { + let table = TimeseriesTable { resource_name: "my_resource".to_string(), schema: Some("s3.ct-benchmark".into()), time_series_table: "timeseries_double".into(), @@ -1023,7 +1023,7 @@ mod tests { day_column: Some("dir2".to_string()), }; let tables = vec![table]; - let transformer = TimeSeriesQueryToSQLTransformer::new(&tables, DatabaseType::Dremio); + let transformer = TimeseriesQueryToSQLTransformer::new(&tables, DatabaseType::Dremio); let (sql_query, _) = transformer.create_query(&tsq, false).unwrap(); let expected_str = r#"SELECT AVG("outer_query"."val_dir") AS "f7ca5ee9058effba8691ac9c642fbe95", AVG("outer_query"."val_speed") AS "990362f372e4019bc151c13baf0b50d5", "outer_query"."year" AS "year", "outer_query"."month" AS "month", "outer_query"."day" AS "day", "outer_query"."hour" AS "hour", "outer_query"."minute_10" AS "minute_10", "outer_query"."grouping_col_0" AS "grouping_col_0" FROM (SELECT "inner_query"."day" AS "day", "inner_query"."grouping_col_0" AS "grouping_col_0", "inner_query"."hour" AS "hour", "inner_query"."minute_10" AS "minute_10", "inner_query"."month" AS "month", "inner_query"."t" AS "t", "inner_query"."val_dir" AS "val_dir", "inner_query"."val_speed" AS "val_speed", "inner_query"."year" AS "year" FROM (SELECT "day" AS "day", "grouping_col_0" AS "grouping_col_0", "hour" AS "hour", "minute_10" AS "minute_10", "month" AS "month", "t" AS "t", "val_dir" AS "val_dir", "val_speed" AS "val_speed", "subquery"."year_partition_column_name" AS "year" FROM (SELECT "day" AS "day", "day_partition_column_name" AS "day_partition_column_name", "grouping_col_0" AS "grouping_col_0", "hour" AS "hour", "minute_10" AS "minute_10", "month_partition_column_name" AS "month_partition_column_name", "t" AS "t", "val_dir" AS "val_dir", "val_speed" AS "val_speed", "year_partition_column_name" AS "year_partition_column_name", "subquery"."month_partition_column_name" AS "month" FROM (SELECT "day_partition_column_name" AS "day_partition_column_name", "grouping_col_0" AS "grouping_col_0", "hour" AS "hour", "minute_10" AS "minute_10", "month_partition_column_name" AS "month_partition_column_name", "t" AS "t", "val_dir" AS "val_dir", "val_speed" AS "val_speed", "year_partition_column_name" AS "year_partition_column_name", "subquery"."day_partition_column_name" AS "day" FROM (SELECT "day_partition_column_name" AS "day_partition_column_name", "grouping_col_0" AS "grouping_col_0", "minute_10" AS "minute_10", "month_partition_column_name" AS "month_partition_column_name", "t" AS "t", "val_dir" AS "val_dir", "val_speed" AS "val_speed", "year_partition_column_name" AS "year_partition_column_name", date_part('hour', "subquery"."t") AS "hour" FROM (SELECT "day_partition_column_name" AS "day_partition_column_name", "grouping_col_0" AS "grouping_col_0", "month_partition_column_name" AS "month_partition_column_name", "t" AS "t", "val_dir" AS "val_dir", "val_speed" AS "val_speed", "year_partition_column_name" AS "year_partition_column_name", CAST(FLOOR(date_part('minute', "subquery"."t") / 10) AS INTEGER) AS "minute_10" FROM (SELECT "first_query"."day_partition_column_name" AS "day_partition_column_name", "first_query"."grouping_col_0" AS "grouping_col_0", "first_query"."month_partition_column_name" AS "month_partition_column_name", "first_query"."t" AS "t", "first_query"."val_speed" AS "val_speed", "first_query"."year_partition_column_name" AS "year_partition_column_name", "other_0"."day_partition_column_name" AS "day_partition_column_name", "other_0"."grouping_col_0" AS "grouping_col_0", "other_0"."month_partition_column_name" AS "month_partition_column_name", "other_0"."val_dir" AS "val_dir", "other_0"."year_partition_column_name" AS "year_partition_column_name" FROM (SELECT "basic_query"."day_partition_column_name" AS "day_partition_column_name", "basic_query"."month_partition_column_name" AS "month_partition_column_name", "basic_query"."t" AS "t", "basic_query"."val_speed" AS "val_speed", "basic_query"."year_partition_column_name" AS "year_partition_column_name", "static_query"."grouping_col_0" AS "grouping_col_0" FROM (SELECT "timestamp" AS "t", "dir3" AS "ts_external_id_1", "value" AS "val_speed", CAST("dir2" AS INTEGER) AS "day_partition_column_name", CAST("dir1" AS INTEGER) AS "month_partition_column_name", CAST("dir0" AS INTEGER) AS "year_partition_column_name" FROM "s3.ct-benchmark"."timeseries_double" WHERE "dir3" IN ('id1')) AS "basic_query" INNER JOIN (SELECT "mapping"."EXPR$0" AS "ts_external_id_1", "mapping"."EXPR$1" AS "grouping_col_0" FROM (VALUES ('id1', 0)) AS "mapping") AS "static_query" ON "static_query"."ts_external_id_1" = "basic_query"."ts_external_id_1") AS "first_query" INNER JOIN (SELECT "basic_query"."day_partition_column_name" AS "day_partition_column_name", "basic_query"."month_partition_column_name" AS "month_partition_column_name", "basic_query"."t" AS "t", "basic_query"."val_dir" AS "val_dir", "basic_query"."year_partition_column_name" AS "year_partition_column_name", "static_query"."grouping_col_0" AS "grouping_col_0" FROM (SELECT "timestamp" AS "t", "dir3" AS "ts_external_id_2", "value" AS "val_dir", CAST("dir2" AS INTEGER) AS "day_partition_column_name", CAST("dir1" AS INTEGER) AS "month_partition_column_name", CAST("dir0" AS INTEGER) AS "year_partition_column_name" FROM "s3.ct-benchmark"."timeseries_double" WHERE "dir3" IN ('id2')) AS "basic_query" INNER JOIN (SELECT "mapping"."EXPR$0" AS "ts_external_id_2", "mapping"."EXPR$1" AS "grouping_col_0" FROM (VALUES ('id2', 1)) AS "mapping") AS "static_query" ON "static_query"."ts_external_id_2" = "basic_query"."ts_external_id_2") AS "other_0" ON "first_query"."grouping_col_0" = "other_0"."grouping_col_0" AND "first_query"."t" = "other_0"."t" AND "first_query"."year_partition_column_name" = "other_0"."year_partition_column_name" AND "first_query"."month_partition_column_name" = "other_0"."month_partition_column_name" AND "first_query"."day_partition_column_name" = "other_0"."day_partition_column_name" WHERE ("year_partition_column_name" > 2022 OR ("year_partition_column_name" = 2022 AND "month_partition_column_name" > 8) OR ("year_partition_column_name" = 2022 AND "month_partition_column_name" = 8 AND "day_partition_column_name" > 30) OR ("year_partition_column_name" = 2022 AND "month_partition_column_name" = 8 AND "day_partition_column_name" = 30 AND "t" >= '2022-08-30 08:46:53')) AND ("year_partition_column_name" < 2022 OR ("year_partition_column_name" = 2022 AND "month_partition_column_name" < 8) OR ("year_partition_column_name" = 2022 AND "month_partition_column_name" = 8 AND "day_partition_column_name" < 30) OR ("year_partition_column_name" = 2022 AND "month_partition_column_name" = 8 AND "day_partition_column_name" = 30 AND "t" <= '2022-08-30 21:46:53'))) AS "subquery") AS "subquery") AS "subquery") AS "subquery") AS "subquery") AS "inner_query") AS "outer_query" GROUP BY "outer_query"."year", "outer_query"."month", "outer_query"."day", "outer_query"."hour", "outer_query"."minute_10", "outer_query"."grouping_col_0" ORDER BY "grouping_col_0" ASC"#; diff --git a/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite.rs b/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite.rs index 7e6685e..5713f59 100644 --- a/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite.rs +++ b/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite.rs @@ -7,7 +7,7 @@ use sea_query::{Expr as SeaExpr, Func}; use spargebra::algebra::Expression; use crate::constants::DATETIME_AS_SECONDS; -use crate::timeseries_database::timeseries_sql_rewrite::{Name, TimeSeriesQueryToSQLError}; +use crate::timeseries_database::timeseries_sql_rewrite::{Name, TimeseriesQueryToSQLError}; use crate::timeseries_database::DatabaseType; pub mod aggregate_expressions; @@ -42,7 +42,7 @@ impl SPARQLToSQLExpressionTransformer<'_> { pub(crate) fn sparql_expression_to_sql_expression( &mut self, e: &Expression, - ) -> Result { + ) -> Result { Ok(match e { Expression::Or(left, right) => self .sparql_expression_to_sql_expression(left)? @@ -70,7 +70,7 @@ impl SPARQLToSQLExpressionTransformer<'_> { } } _ => { - return Err(TimeSeriesQueryToSQLError::UnknownDatatype( + return Err(TimeseriesQueryToSQLError::UnknownDatatype( l.datatype().as_str().to_string(), )); } @@ -117,7 +117,7 @@ impl SPARQLToSQLExpressionTransformer<'_> { } else if let Err(e) = v { return Err(e); } else { - return Err(TimeSeriesQueryToSQLError::FoundNonValueInInExpression); + return Err(TimeseriesQueryToSQLError::FoundNonValueInInExpression); } } SeaExpr::expr(self.sparql_expression_to_sql_expression(left)?) diff --git a/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite/aggregate_expressions.rs b/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite/aggregate_expressions.rs index 47dbb28..2559ec5 100644 --- a/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite/aggregate_expressions.rs +++ b/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite/aggregate_expressions.rs @@ -1,5 +1,5 @@ use super::SPARQLToSQLExpressionTransformer; -use crate::timeseries_database::timeseries_sql_rewrite::TimeSeriesQueryToSQLError; +use crate::timeseries_database::timeseries_sql_rewrite::TimeseriesQueryToSQLError; use sea_query::{Func, SimpleExpr}; use spargebra::algebra::AggregateExpression; @@ -8,7 +8,7 @@ impl SPARQLToSQLExpressionTransformer<'_> { pub(crate) fn sparql_aggregate_expression_to_sql_expression( &mut self, agg: &AggregateExpression, - ) -> Result { + ) -> Result { Ok(match agg { AggregateExpression::Count { expr, distinct: _ } => { if let Some(some_expr) = expr { diff --git a/chrontext/src/timeseries_query.rs b/chrontext/src/timeseries_query.rs index f50268e..445d3c0 100644 --- a/chrontext/src/timeseries_query.rs +++ b/chrontext/src/timeseries_query.rs @@ -13,13 +13,13 @@ use std::error::Error; use std::fmt::{Display, Formatter}; #[derive(Debug, Clone, PartialEq)] -pub enum TimeSeriesQuery { - Basic(BasicTimeSeriesQuery), - GroupedBasic(BasicTimeSeriesQuery, DataFrame, String), - Filtered(Box, Expression), - InnerSynchronized(Vec>, Vec), - ExpressionAs(Box, Variable, Expression), - Grouped(GroupedTimeSeriesQuery), +pub enum TimeseriesQuery { + Basic(BasicTimeseriesQuery), + GroupedBasic(BasicTimeseriesQuery, DataFrame, String), + Filtered(Box, Expression), + InnerSynchronized(Vec>, Vec), + ExpressionAs(Box, Variable, Expression), + Grouped(GroupedTimeseriesQuery), } #[derive(Debug, Clone, PartialEq)] @@ -28,15 +28,15 @@ pub enum Synchronizer { } #[derive(Debug, Clone, PartialEq)] -pub struct GroupedTimeSeriesQuery { +pub struct GroupedTimeseriesQuery { pub context: Context, //TODO: Fix this workaround properly - pub tsq: Box, + pub tsq: Box, pub by: Vec, pub aggregations: Vec<(Variable, AggregateExpression)>, } #[derive(Debug, Clone, PartialEq)] -pub struct BasicTimeSeriesQuery { +pub struct BasicTimeseriesQuery { pub identifier_variable: Option, pub timeseries_variable: Option, pub data_point_variable: Option, @@ -49,7 +49,7 @@ pub struct BasicTimeSeriesQuery { pub ids: Option>, } -impl BasicTimeSeriesQuery { +impl BasicTimeseriesQuery { fn expected_columns(&self) -> HashSet<&str> { let mut expected_columns = HashSet::new(); expected_columns.insert(self.identifier_variable.as_ref().unwrap().as_str()); @@ -64,12 +64,12 @@ impl BasicTimeSeriesQuery { } #[derive(Debug)] -pub struct TimeSeriesValidationError { +pub struct TimeseriesValidationError { missing_columns: Vec, extra_columns: Vec, } -impl Display for TimeSeriesValidationError { +impl Display for TimeseriesValidationError { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { write!( f, @@ -80,14 +80,14 @@ impl Display for TimeSeriesValidationError { } } -impl Error for TimeSeriesValidationError {} +impl Error for TimeseriesValidationError {} -impl TimeSeriesQuery { - pub(crate) fn validate(&self, df: &DataFrame) -> Result<(), TimeSeriesValidationError> { +impl TimeseriesQuery { + pub(crate) fn validate(&self, df: &DataFrame) -> Result<(), TimeseriesValidationError> { let expected_columns = self.expected_columns(); let df_columns: HashSet<&str> = df.get_column_names().into_iter().collect(); if expected_columns != df_columns { - let err = TimeSeriesValidationError { + let err = TimeseriesValidationError { missing_columns: expected_columns .difference(&df_columns) .map(|x| x.to_string()) @@ -105,15 +105,15 @@ impl TimeSeriesQuery { fn expected_columns<'a>(&'a self) -> HashSet<&'a str> { match self { - TimeSeriesQuery::Basic(b) => b.expected_columns(), - TimeSeriesQuery::Filtered(inner, ..) => inner.expected_columns(), - TimeSeriesQuery::InnerSynchronized(inners, _synchronizers) => { + TimeseriesQuery::Basic(b) => b.expected_columns(), + TimeseriesQuery::Filtered(inner, ..) => inner.expected_columns(), + TimeseriesQuery::InnerSynchronized(inners, _synchronizers) => { inners.iter().fold(HashSet::new(), |mut exp, tsq| { exp.extend(tsq.expected_columns()); exp }) } - TimeSeriesQuery::Grouped(g) => { + TimeseriesQuery::Grouped(g) => { let mut expected_columns = HashSet::new(); for (v, _) in &g.aggregations { expected_columns.insert(v.as_str()); @@ -131,13 +131,13 @@ impl TimeSeriesQuery { expected_columns.insert(grouping_col.unwrap().as_str()); expected_columns } - TimeSeriesQuery::GroupedBasic(b, _, c) => { + TimeseriesQuery::GroupedBasic(b, _, c) => { let mut expected = b.expected_columns(); expected.insert(c.as_str()); expected.remove(b.identifier_variable.as_ref().unwrap().as_str()); expected } - TimeSeriesQuery::ExpressionAs(t, ..) => t.expected_columns(), + TimeseriesQuery::ExpressionAs(t, ..) => t.expected_columns(), } } @@ -156,117 +156,117 @@ impl TimeSeriesQuery { pub(crate) fn get_ids(&self) -> Vec<&String> { match self { - TimeSeriesQuery::Basic(b) => { + TimeseriesQuery::Basic(b) => { if let Some(ids) = &b.ids { ids.iter().collect() } else { vec![] } } - TimeSeriesQuery::Filtered(inner, _) => inner.get_ids(), - TimeSeriesQuery::InnerSynchronized(inners, _) => { + TimeseriesQuery::Filtered(inner, _) => inner.get_ids(), + TimeseriesQuery::InnerSynchronized(inners, _) => { let mut ss = vec![]; for inner in inners { ss.extend(inner.get_ids()) } ss } - TimeSeriesQuery::Grouped(grouped) => grouped.tsq.get_ids(), - TimeSeriesQuery::GroupedBasic(b, ..) => { + TimeseriesQuery::Grouped(grouped) => grouped.tsq.get_ids(), + TimeseriesQuery::GroupedBasic(b, ..) => { if let Some(ids) = &b.ids { ids.iter().collect() } else { vec![] } } - TimeSeriesQuery::ExpressionAs(tsq, ..) => tsq.get_ids(), + TimeseriesQuery::ExpressionAs(tsq, ..) => tsq.get_ids(), } } pub(crate) fn get_value_variables(&self) -> Vec<&VariableInContext> { match self { - TimeSeriesQuery::Basic(b) => { + TimeseriesQuery::Basic(b) => { if let Some(val_var) = &b.value_variable { vec![val_var] } else { vec![] } } - TimeSeriesQuery::Filtered(inner, _) => inner.get_value_variables(), - TimeSeriesQuery::InnerSynchronized(inners, _) => { + TimeseriesQuery::Filtered(inner, _) => inner.get_value_variables(), + TimeseriesQuery::InnerSynchronized(inners, _) => { let mut vs = vec![]; for inner in inners { vs.extend(inner.get_value_variables()) } vs } - TimeSeriesQuery::Grouped(grouped) => grouped.tsq.get_value_variables(), - TimeSeriesQuery::GroupedBasic(b, ..) => { + TimeseriesQuery::Grouped(grouped) => grouped.tsq.get_value_variables(), + TimeseriesQuery::GroupedBasic(b, ..) => { if let Some(val_var) = &b.value_variable { vec![val_var] } else { vec![] } } - TimeSeriesQuery::ExpressionAs(t, ..) => t.get_value_variables(), + TimeseriesQuery::ExpressionAs(t, ..) => t.get_value_variables(), } } pub(crate) fn get_identifier_variables(&self) -> Vec<&Variable> { match self { - TimeSeriesQuery::Basic(b) => { + TimeseriesQuery::Basic(b) => { if let Some(id_var) = &b.identifier_variable { vec![id_var] } else { vec![] } } - TimeSeriesQuery::Filtered(inner, _) => inner.get_identifier_variables(), - TimeSeriesQuery::InnerSynchronized(inners, _) => { + TimeseriesQuery::Filtered(inner, _) => inner.get_identifier_variables(), + TimeseriesQuery::InnerSynchronized(inners, _) => { let mut vs = vec![]; for inner in inners { vs.extend(inner.get_identifier_variables()) } vs } - TimeSeriesQuery::Grouped(grouped) => grouped.tsq.get_identifier_variables(), - TimeSeriesQuery::GroupedBasic(b, ..) => { + TimeseriesQuery::Grouped(grouped) => grouped.tsq.get_identifier_variables(), + TimeseriesQuery::GroupedBasic(b, ..) => { if let Some(id_var) = &b.identifier_variable { vec![id_var] } else { vec![] } } - TimeSeriesQuery::ExpressionAs(t, ..) => t.get_identifier_variables(), + TimeseriesQuery::ExpressionAs(t, ..) => t.get_identifier_variables(), } } pub(crate) fn get_datatype_variables(&self) -> Vec<&Variable> { match self { - TimeSeriesQuery::Basic(b) => { + TimeseriesQuery::Basic(b) => { if let Some(dt_var) = &b.datatype_variable { vec![dt_var] } else { vec![] } } - TimeSeriesQuery::Filtered(inner, _) => inner.get_datatype_variables(), - TimeSeriesQuery::InnerSynchronized(inners, _) => { + TimeseriesQuery::Filtered(inner, _) => inner.get_datatype_variables(), + TimeseriesQuery::InnerSynchronized(inners, _) => { let mut vs = vec![]; for inner in inners { vs.extend(inner.get_datatype_variables()) } vs } - TimeSeriesQuery::Grouped(grouped) => grouped.tsq.get_datatype_variables(), - TimeSeriesQuery::GroupedBasic(b, ..) => { + TimeseriesQuery::Grouped(grouped) => grouped.tsq.get_datatype_variables(), + TimeseriesQuery::GroupedBasic(b, ..) => { if let Some(dt_var) = &b.datatype_variable { vec![dt_var] } else { vec![] } } - TimeSeriesQuery::ExpressionAs(t, ..) => t.get_datatype_variables(), + TimeseriesQuery::ExpressionAs(t, ..) => t.get_datatype_variables(), } } @@ -285,37 +285,37 @@ impl TimeSeriesQuery { pub(crate) fn get_timestamp_variables(&self) -> Vec<&VariableInContext> { match self { - TimeSeriesQuery::Basic(b) => { + TimeseriesQuery::Basic(b) => { if let Some(v) = &b.timestamp_variable { vec![v] } else { vec![] } } - TimeSeriesQuery::Filtered(t, _) => t.get_timestamp_variables(), - TimeSeriesQuery::InnerSynchronized(ts, _) => { + TimeseriesQuery::Filtered(t, _) => t.get_timestamp_variables(), + TimeseriesQuery::InnerSynchronized(ts, _) => { let mut vs = vec![]; for t in ts { vs.extend(t.get_timestamp_variables()) } vs } - TimeSeriesQuery::Grouped(grouped) => grouped.tsq.get_timestamp_variables(), - TimeSeriesQuery::GroupedBasic(b, ..) => { + TimeseriesQuery::Grouped(grouped) => grouped.tsq.get_timestamp_variables(), + TimeseriesQuery::GroupedBasic(b, ..) => { if let Some(v) = &b.timestamp_variable { vec![v] } else { vec![] } } - TimeSeriesQuery::ExpressionAs(t, ..) => t.get_timestamp_variables(), + TimeseriesQuery::ExpressionAs(t, ..) => t.get_timestamp_variables(), } } } -impl BasicTimeSeriesQuery { - pub fn new_empty() -> BasicTimeSeriesQuery { - BasicTimeSeriesQuery { +impl BasicTimeseriesQuery { + pub fn new_empty() -> BasicTimeseriesQuery { + BasicTimeseriesQuery { identifier_variable: None, timeseries_variable: None, data_point_variable: None, @@ -330,13 +330,13 @@ impl BasicTimeSeriesQuery { } } -impl TimeSeriesQuery { +impl TimeseriesQuery { pub fn get_groupby_column(&self) -> Option<&String> { match self { - TimeSeriesQuery::Basic(..) => None, - TimeSeriesQuery::GroupedBasic(_, _, colname) => Some(colname), - TimeSeriesQuery::Filtered(tsq, _) => tsq.get_groupby_column(), - TimeSeriesQuery::InnerSynchronized(tsqs, _) => { + TimeseriesQuery::Basic(..) => None, + TimeseriesQuery::GroupedBasic(_, _, colname) => Some(colname), + TimeseriesQuery::Filtered(tsq, _) => tsq.get_groupby_column(), + TimeseriesQuery::InnerSynchronized(tsqs, _) => { let mut colname = None; for tsq in tsqs { let new_colname = tsq.get_groupby_column(); @@ -349,17 +349,17 @@ impl TimeSeriesQuery { } colname } - TimeSeriesQuery::ExpressionAs(tsq, ..) => tsq.get_groupby_column(), - TimeSeriesQuery::Grouped(grouped) => grouped.tsq.get_groupby_column(), + TimeseriesQuery::ExpressionAs(tsq, ..) => tsq.get_groupby_column(), + TimeseriesQuery::Grouped(grouped) => grouped.tsq.get_groupby_column(), } } pub fn get_groupby_mapping_df(&self) -> Option<&DataFrame> { match self { - TimeSeriesQuery::Basic(..) => None, - TimeSeriesQuery::GroupedBasic(_, df, _) => Some(df), - TimeSeriesQuery::Filtered(tsq, _) => tsq.get_groupby_mapping_df(), - TimeSeriesQuery::InnerSynchronized(tsqs, _) => { + TimeseriesQuery::Basic(..) => None, + TimeseriesQuery::GroupedBasic(_, df, _) => Some(df), + TimeseriesQuery::Filtered(tsq, _) => tsq.get_groupby_mapping_df(), + TimeseriesQuery::InnerSynchronized(tsqs, _) => { let mut colname = None; for tsq in tsqs { let new_colname = tsq.get_groupby_mapping_df(); @@ -372,28 +372,28 @@ impl TimeSeriesQuery { } colname } - TimeSeriesQuery::ExpressionAs(tsq, ..) => tsq.get_groupby_mapping_df(), - TimeSeriesQuery::Grouped(grouped) => grouped.tsq.get_groupby_mapping_df(), + TimeseriesQuery::ExpressionAs(tsq, ..) => tsq.get_groupby_mapping_df(), + TimeseriesQuery::Grouped(grouped) => grouped.tsq.get_groupby_mapping_df(), } } pub fn get_timeseries_functions(&self, context: &Context) -> Vec<(&Variable, &Expression)> { match self { - TimeSeriesQuery::Basic(..) => { + TimeseriesQuery::Basic(..) => { vec![] } - TimeSeriesQuery::GroupedBasic(..) => { + TimeseriesQuery::GroupedBasic(..) => { vec![] } - TimeSeriesQuery::Filtered(tsq, _) => tsq.get_timeseries_functions(context), - TimeSeriesQuery::InnerSynchronized(tsqs, _) => { + TimeseriesQuery::Filtered(tsq, _) => tsq.get_timeseries_functions(context), + TimeseriesQuery::InnerSynchronized(tsqs, _) => { let mut out_tsfs = vec![]; for tsq in tsqs { out_tsfs.extend(tsq.get_timeseries_functions(context)) } out_tsfs } - TimeSeriesQuery::ExpressionAs(tsq, v, e) => { + TimeseriesQuery::ExpressionAs(tsq, v, e) => { let mut tsfs = vec![]; let mut used_vars = HashSet::new(); find_all_used_variables_in_expression(e, &mut used_vars); @@ -413,13 +413,13 @@ impl TimeSeriesQuery { tsfs.extend(tsq.get_timeseries_functions(context)); tsfs } - TimeSeriesQuery::Grouped(tsq, ..) => tsq.tsq.get_timeseries_functions(context), + TimeseriesQuery::Grouped(tsq, ..) => tsq.tsq.get_timeseries_functions(context), } } pub fn get_datatype_map(&self) -> HashMap { match self { - TimeSeriesQuery::Basic(b) => { + TimeseriesQuery::Basic(b) => { let mut map = HashMap::new(); if let Some(tsv) = &b.timestamp_variable { map.insert( @@ -435,7 +435,7 @@ impl TimeSeriesQuery { } map } - TimeSeriesQuery::GroupedBasic(b, ..) => HashMap::from([( + TimeseriesQuery::GroupedBasic(b, ..) => HashMap::from([( b.value_variable .as_ref() .unwrap() @@ -444,15 +444,15 @@ impl TimeSeriesQuery { .to_string(), b.datatype.as_ref().unwrap().clone(), )]), - TimeSeriesQuery::Filtered(tsq, _) => tsq.get_datatype_map(), - TimeSeriesQuery::InnerSynchronized(tsqs, _) => { + TimeseriesQuery::Filtered(tsq, _) => tsq.get_datatype_map(), + TimeseriesQuery::InnerSynchronized(tsqs, _) => { let mut map = HashMap::new(); for tsq in tsqs { map.extend(tsq.get_datatype_map()); } map } - TimeSeriesQuery::ExpressionAs(tsq, v, e) => { + TimeseriesQuery::ExpressionAs(tsq, v, e) => { let v_str = v.as_str(); let mut map = tsq.get_datatype_map(); let mut used_vars = HashSet::new(); @@ -467,7 +467,7 @@ impl TimeSeriesQuery { } map } - TimeSeriesQuery::Grouped(gr) => { + TimeseriesQuery::Grouped(gr) => { let mut map = gr.tsq.get_datatype_map(); for (v, agg) in gr.aggregations.iter().rev() { let v_str = v.as_str(); diff --git a/chrontext/tests/query_execution_arrow_sql.rs b/chrontext/tests/query_execution_arrow_sql.rs index a9c3a60..f201f4e 100644 --- a/chrontext/tests/query_execution_arrow_sql.rs +++ b/chrontext/tests/query_execution_arrow_sql.rs @@ -11,7 +11,7 @@ use chrontext::engine::Engine; use chrontext::pushdown_setting::all_pushdowns; use chrontext::sparql_database::sparql_endpoint::SparqlEndpoint; use chrontext::timeseries_database::arrow_flight_sql_database::ArrowFlightSQLDatabase; -use chrontext::timeseries_database::timeseries_sql_rewrite::TimeSeriesTable; +use chrontext::timeseries_database::timeseries_sql_rewrite::TimeseriesTable; use futures_util::stream::StreamExt; use log::debug; use oxrdf::vocab::xsd; @@ -190,8 +190,8 @@ async fn with_sparql_testdata(#[future] sparql_endpoint: (), mut shared_testdata } #[fixture] -fn timeseries_table() -> TimeSeriesTable { - TimeSeriesTable { +fn timeseries_table() -> TimeseriesTable { + TimeseriesTable { resource_name: "my_resource".into(), schema: Some("my_nas".to_string()), time_series_table: "ts.parquet".to_string(), @@ -205,7 +205,7 @@ fn timeseries_table() -> TimeSeriesTable { } } -async fn ts_sql_db(timeseries_table: TimeSeriesTable) -> ArrowFlightSQLDatabase { +async fn ts_sql_db(timeseries_table: TimeseriesTable) -> ArrowFlightSQLDatabase { ArrowFlightSQLDatabase::new( ARROW_SQL_DATABASE_ENDPOINT, "dremio", @@ -314,7 +314,7 @@ async fn with_timeseries_testdata(#[future] arrow_sql_endpoint: ()) { async fn test_simple_hybrid_query( #[future] with_sparql_testdata: (), #[future] with_timeseries_testdata: (), - timeseries_table: TimeSeriesTable, + timeseries_table: TimeseriesTable, shared_testdata_path: PathBuf, use_logger: (), ) { diff --git a/chrontext/tests/rewrites.rs b/chrontext/tests/rewrites.rs index 6b87321..adcb0a1 100644 --- a/chrontext/tests/rewrites.rs +++ b/chrontext/tests/rewrites.rs @@ -2,7 +2,7 @@ use chrontext::preprocessing::Preprocessor; use chrontext::query_context::{Context, PathEntry, VariableInContext}; use chrontext::rewriting::StaticQueryRewriter; use chrontext::splitter::parse_sparql_select_query; -use chrontext::timeseries_query::BasicTimeSeriesQuery; +use chrontext::timeseries_query::BasicTimeseriesQuery; use spargebra::term::Variable; use spargebra::Query; @@ -369,7 +369,7 @@ fn test_fix_dropped_triple() { let expected_query = Query::parse(expected_str, None).unwrap(); assert_eq!(static_rewrite, &expected_query); - let expected_time_series_queries = vec![BasicTimeSeriesQuery { + let expected_time_series_queries = vec![BasicTimeseriesQuery { identifier_variable: Some(Variable::new_unchecked("ts_external_id_0")), timeseries_variable: Some(VariableInContext::new( Variable::new_unchecked("ts"), @@ -451,7 +451,7 @@ fn test_property_path_expression() { "#; let expected_query = Query::parse(expected_str, None).unwrap(); let expected_time_series_queries = vec![ - BasicTimeSeriesQuery { + BasicTimeseriesQuery { identifier_variable: Some(Variable::new_unchecked("ts_external_id_0")), timeseries_variable: Some(VariableInContext::new( Variable::new_unchecked("blank_replacement_0"), @@ -491,7 +491,7 @@ fn test_property_path_expression() { )), ids: None, }, - BasicTimeSeriesQuery { + BasicTimeseriesQuery { identifier_variable: Some(Variable::new_unchecked("ts_external_id_1")), timeseries_variable: Some(VariableInContext::new( Variable::new_unchecked("blank_replacement_1"), diff --git a/py_chrontext/Cargo.toml b/py_chrontext/Cargo.toml index ef26e24..27d8190 100644 --- a/py_chrontext/Cargo.toml +++ b/py_chrontext/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "py_chrontext" -version = "0.5.1" +version = "0.5.2" edition = "2021" [workspace] diff --git a/py_chrontext/src/errors.rs b/py_chrontext/src/errors.rs index ff03b89..ad3b0d4 100644 --- a/py_chrontext/src/errors.rs +++ b/py_chrontext/src/errors.rs @@ -15,14 +15,14 @@ pub enum PyQueryError { #[error(transparent)] QueryExecutionError(Box), #[error("Missing time series database")] - MissingTimeSeriesDatabaseError, + MissingTimeseriesDatabaseError, #[error("Missing sparql database")] MissingSPARQLDatabaseError, #[error("Time series database defined multiple times")] - MultipleTimeSeriesDatabases, + MultipleTimeseriesDatabases, #[error("Sparql database defined multiple times")] MultipleSPARQLDatabases, - #[error("Oxigraph storage error `{0}`")] + #[error("Oxigraph storage error. Perhaps another variable in a notebook is holding the lock on your database? Try setting e.g. engine=None first. \nThe error message was:`{0}`")] OxigraphStorageError(StorageError), #[error("Read ntriples file error `{0}`")] ReadNTriplesFileError(io::Error), @@ -42,10 +42,10 @@ impl std::convert::From for PyErr { PyQueryError::QueryExecutionError(err) => { QueryExecutionError::new_err(format!("{}", err)) } - PyQueryError::MissingTimeSeriesDatabaseError => { - MissingTimeSeriesDatabaseError::new_err("") + PyQueryError::MissingTimeseriesDatabaseError => { + MissingTimeseriesDatabaseError::new_err("") } - PyQueryError::MultipleTimeSeriesDatabases => MultipleTimeSeriesDatabases::new_err(""), + PyQueryError::MultipleTimeseriesDatabases => MultipleTimeseriesDatabases::new_err(""), PyQueryError::OxigraphStorageError(o) => { OxigraphStorageError::new_err(format!("{}", o)) } @@ -62,8 +62,8 @@ impl std::convert::From for PyErr { create_exception!(exceptions, ArrowFlightSQLError, PyException); create_exception!(exceptions, DatatypeIRIParseError, PyException); create_exception!(exceptions, QueryExecutionError, PyException); -create_exception!(exceptions, MissingTimeSeriesDatabaseError, PyException); -create_exception!(exceptions, MultipleTimeSeriesDatabases, PyException); +create_exception!(exceptions, MissingTimeseriesDatabaseError, PyException); +create_exception!(exceptions, MultipleTimeseriesDatabases, PyException); create_exception!(exceptions, OxigraphStorageError, PyException); create_exception!(exceptions, ReadNTriplesFileError, PyException); create_exception!(exceptions, OxigraphLoaderError, PyException); diff --git a/py_chrontext/src/lib.rs b/py_chrontext/src/lib.rs index 102d1db..8ad7b26 100644 --- a/py_chrontext/src/lib.rs +++ b/py_chrontext/src/lib.rs @@ -54,8 +54,8 @@ use chrontext::sparql_database::SparqlQueryable; use chrontext::timeseries_database::arrow_flight_sql_database::ArrowFlightSQLDatabase as RustArrowFlightSQLDatabase; use chrontext::timeseries_database::bigquery_database::BigQueryDatabase as RustBigQueryDatabase; use chrontext::timeseries_database::opcua_history_read::OPCUAHistoryRead as RustOPCUAHistoryRead; -use chrontext::timeseries_database::timeseries_sql_rewrite::TimeSeriesTable as RustTimeSeriesTable; -use chrontext::timeseries_database::TimeSeriesQueryable; +use chrontext::timeseries_database::timeseries_sql_rewrite::TimeseriesTable as RustTimeseriesTable; +use chrontext::timeseries_database::TimeseriesQueryable; use log::debug; use oxigraph::io::DatasetFormat; use oxrdf::{IriParseError, NamedNode}; @@ -103,10 +103,10 @@ timeseries_opcua_db: Optional[TimeseriesOPCUADatabase])")] } if num_ts == 0 { - return Err(PyQueryError::MissingTimeSeriesDatabaseError.into()); + return Err(PyQueryError::MissingTimeseriesDatabaseError.into()); } if num_ts > 1 { - return Err(PyQueryError::MultipleTimeSeriesDatabases.into()); + return Err(PyQueryError::MultipleTimeseriesDatabases.into()); } let mut engine = Engine { @@ -130,7 +130,7 @@ timeseries_opcua_db: Optional[TimeseriesOPCUADatabase])")] } else if let Some(db) = &self.timeseries_dremio_db { create_arrow_flight_sql(&db.clone())? } else { - return Err(PyQueryError::MissingTimeSeriesDatabaseError.into()); + return Err(PyQueryError::MissingTimeseriesDatabaseError.into()); }; let sparql_db = if self.engine.is_some() { @@ -218,7 +218,7 @@ pub struct TimeseriesDremioDatabase { port: u16, username: String, password: String, - tables: Vec, + tables: Vec, } #[pymethods] @@ -229,7 +229,7 @@ impl TimeseriesDremioDatabase { port: u16, username: String, password: String, - tables: Vec, + tables: Vec, ) -> TimeseriesDremioDatabase { TimeseriesDremioDatabase { username, @@ -244,14 +244,14 @@ impl TimeseriesDremioDatabase { #[pyclass] #[derive(Clone)] pub struct TimeseriesBigQueryDatabase { - tables: Vec, + tables: Vec, key: String, } #[pymethods] impl TimeseriesBigQueryDatabase { #[new] - pub fn new(tables: Vec, key: String) -> TimeseriesBigQueryDatabase { + pub fn new(tables: Vec, key: String) -> TimeseriesBigQueryDatabase { TimeseriesBigQueryDatabase { tables, key } } } @@ -276,7 +276,7 @@ impl TimeseriesOPCUADatabase { pub fn create_arrow_flight_sql( db: &TimeseriesDremioDatabase, -) -> PyResult<(HashSet, Box)> { +) -> PyResult<(HashSet, Box)> { let endpoint = format!("http://{}:{}", &db.host, &db.port); let mut new_tables = vec![]; for t in &db.tables { @@ -297,7 +297,7 @@ pub fn create_arrow_flight_sql( pub fn create_bigquery_database( db: &TimeseriesBigQueryDatabase, -) -> PyResult<(HashSet, Box)> { +) -> PyResult<(HashSet, Box)> { let mut new_tables = vec![]; for t in &db.tables { new_tables.push(t.to_rust_table().map_err(PyQueryError::from)?); @@ -312,7 +312,7 @@ pub fn create_bigquery_database( fn create_opcua_history_read( db: &TimeseriesOPCUADatabase, -) -> PyResult<(HashSet, Box)> { +) -> PyResult<(HashSet, Box)> { let actual_db = RustOPCUAHistoryRead::new(&db.endpoint, db.namespace); Ok(([PushdownSetting::GroupBy].into(), Box::new(actual_db))) } @@ -353,7 +353,7 @@ fn create_oxigraph(db: &SparqlEmbeddedOxigraph) -> PyResult PyResult, pub time_series_table: String, @@ -378,7 +378,7 @@ pub struct TimeSeriesTable { } #[pymethods] -impl TimeSeriesTable { +impl TimeseriesTable { #[new] pub fn new( resource_name: String, @@ -391,8 +391,8 @@ impl TimeSeriesTable { year_column: Option, month_column: Option, day_column: Option, - ) -> TimeSeriesTable { - TimeSeriesTable { + ) -> TimeseriesTable { + TimeseriesTable { resource_name, schema, time_series_table, @@ -407,9 +407,9 @@ impl TimeSeriesTable { } } -impl TimeSeriesTable { - fn to_rust_table(&self) -> Result { - Ok(RustTimeSeriesTable { +impl TimeseriesTable { + fn to_rust_table(&self) -> Result { + Ok(RustTimeseriesTable { resource_name: self.resource_name.clone(), schema: self.schema.clone(), time_series_table: self.time_series_table.clone(), @@ -441,7 +441,7 @@ fn _chrontext(_py: Python<'_>, m: &PyModule) -> PyResult<()> { } m.add_class::()?; - m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/py_chrontext/tests/test_arrow_flight_sql.py b/py_chrontext/tests/test_arrow_flight_sql.py index 0cd8340..0c2f62d 100644 --- a/py_chrontext/tests/test_arrow_flight_sql.py +++ b/py_chrontext/tests/test_arrow_flight_sql.py @@ -3,7 +3,7 @@ import pytest from SPARQLWrapper import SPARQLWrapper, POST, JSON -from chrontext import Engine, TimeseriesDremioDatabase, TimeSeriesTable, SparqlEmbeddedOxigraph +from chrontext import Engine, TimeseriesDremioDatabase, TimeseriesTable, SparqlEmbeddedOxigraph import polars as pl from polars.testing import assert_frame_equal @@ -32,7 +32,7 @@ def oxigraph_embedded(oxigraph_db): def test_simple_query(dremio_testdata, oxigraph_testdata): tables = [ - TimeSeriesTable( + TimeseriesTable( resource_name="my_resource", schema="my_nas", time_series_table="ts.parquet", @@ -64,7 +64,7 @@ def test_simple_query(dremio_testdata, oxigraph_testdata): def test_simple_query_embedded_oxigraph(dremio_testdata, oxigraph_embedded): tables = [ - TimeSeriesTable( + TimeseriesTable( resource_name="my_resource", schema="my_nas", time_series_table="ts.parquet", @@ -97,7 +97,7 @@ def test_simple_query_embedded_oxigraph(dremio_testdata, oxigraph_embedded): def test_simple_query_after_exception(dremio_testdata, oxigraph_testdata): tables = [ - TimeSeriesTable( + TimeseriesTable( resource_name="my_resource", schema="my_nas", time_series_table="ts.parquet", diff --git a/py_chrontext/tests/test_opcua.py b/py_chrontext/tests/test_opcua.py index 8e593e7..57fb396 100644 --- a/py_chrontext/tests/test_opcua.py +++ b/py_chrontext/tests/test_opcua.py @@ -13,7 +13,7 @@ from asyncua.ua import NodeId, String, Int16, DataValue, Variant from datetime import datetime -from chrontext import Engine, TimeseriesOPCUADatabase, TimeSeriesTable +from chrontext import Engine, TimeseriesOPCUADatabase, TimeseriesTable PATH_HERE = pathlib.Path(__file__).parent TESTDATA_PATH = PATH_HERE / "testdata"