From 6bbf39b34954ea20fb6d4150607747ca515b6a52 Mon Sep 17 00:00:00 2001 From: Magnus Bakken <10287813+magbak@users.noreply.github.com> Date: Thu, 7 Sep 2023 19:55:50 +0200 Subject: [PATCH] Appears to be working.. --- chrontext/Cargo.toml | 2 +- chrontext/src/timeseries_database.rs | 22 ++++- .../arrow_flight_sql_database.rs | 5 +- .../timeseries_database/bigquery_database.rs | 29 +++--- .../timeseries_sql_rewrite.rs | 93 +++++++++++++------ .../expression_rewrite.rs | 81 +++++++++------- .../aggregate_expressions.rs | 1 + .../partitioning_support.rs | 5 +- py_chrontext/src/lib.rs | 15 +-- 9 files changed, 165 insertions(+), 88 deletions(-) diff --git a/chrontext/Cargo.toml b/chrontext/Cargo.toml index 93b3f95..b976a3a 100644 --- a/chrontext/Cargo.toml +++ b/chrontext/Cargo.toml @@ -19,7 +19,7 @@ tokio-stream = "0.1.14" arrow2 = {version="0.17.3", features=["io_flight"]} arrow-format = {version="0.8.1", features=["flight-data", "flight-service"]} polars-core = "0.32.1" -sea-query = { version="0.30.1", features=["with-chrono"]} +sea-query = { git="https://github.com/DataTreehouse/sea-query", branch="feature/bigquery_basic_support", features=["with-chrono", "backend-bigquery"]} async-trait = "0.1.68" base64 = "0.13.0" opcua-client = "0.9.1" diff --git a/chrontext/src/timeseries_database.rs b/chrontext/src/timeseries_database.rs index b845a57..fcb9a96 100644 --- a/chrontext/src/timeseries_database.rs +++ b/chrontext/src/timeseries_database.rs @@ -11,7 +11,7 @@ use crate::timeseries_query::TimeSeriesQuery; use async_trait::async_trait; use log::debug; use polars::frame::DataFrame; -use sea_query::QueryBuilder; +use sea_query::{BigQueryQueryBuilder, PostgresQueryBuilder, QueryBuilder}; use std::error::Error; #[async_trait] @@ -20,17 +20,29 @@ pub trait TimeSeriesQueryable: Send { fn allow_compound_timeseries_queries(&self) -> bool; } +#[derive(Clone)] +pub enum DatabaseType { + BigQuery, + Dremio +} + pub trait TimeSeriesSQLQueryable { - fn get_sql_string( + fn get_sql_string( &self, tsq: &TimeSeriesQuery, - query_builder: T, + database_type:DatabaseType, ) -> Result { + + let query_string; { - let transformer = TimeSeriesQueryToSQLTransformer::new(&self.get_time_series_tables()); + let transformer = TimeSeriesQueryToSQLTransformer::new(&self.get_time_series_tables(), database_type.clone()); let (query, _) = transformer.create_query(tsq, false)?; - query_string = query.to_string(query_builder); + query_string = match database_type { + DatabaseType::BigQuery => {query.to_string(BigQueryQueryBuilder)} + DatabaseType::Dremio => {query.to_string(PostgresQueryBuilder)} + }; + debug!("SQL: {}", query_string); } Ok(query_string) diff --git a/chrontext/src/timeseries_database/arrow_flight_sql_database.rs b/chrontext/src/timeseries_database/arrow_flight_sql_database.rs index 907a7c2..37ef60b 100644 --- a/chrontext/src/timeseries_database/arrow_flight_sql_database.rs +++ b/chrontext/src/timeseries_database/arrow_flight_sql_database.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::timeseries_database::{TimeSeriesQueryable, TimeSeriesSQLQueryable}; +use crate::timeseries_database::{DatabaseType, TimeSeriesQueryable, TimeSeriesSQLQueryable}; use crate::timeseries_query::TimeSeriesQuery; use arrow2::io::flight as flight2; use arrow_format::flight::data::{FlightDescriptor, FlightInfo, HandshakeRequest}; @@ -30,7 +30,6 @@ use arrow_format::ipc::MessageHeaderRef; use log::{debug, warn}; use polars_core::error::ArrowError; use polars_core::prelude::PolarsError; -use sea_query::{PostgresQueryBuilder}; use std::error::Error; use std::fmt::{Display, Formatter}; use std::time::Instant; @@ -223,7 +222,7 @@ impl ArrowFlightSQLDatabase { #[async_trait] impl TimeSeriesQueryable for ArrowFlightSQLDatabase { async fn execute(&mut self, tsq: &TimeSeriesQuery) -> Result> { - let query_string = self.get_sql_string(tsq, PostgresQueryBuilder)?; + let query_string = self.get_sql_string(tsq, DatabaseType::Dremio)?; Ok(self.execute_sql_query(query_string).await?) } diff --git a/chrontext/src/timeseries_database/bigquery_database.rs b/chrontext/src/timeseries_database/bigquery_database.rs index f1f2ec0..2ace418 100644 --- a/chrontext/src/timeseries_database/bigquery_database.rs +++ b/chrontext/src/timeseries_database/bigquery_database.rs @@ -1,20 +1,20 @@ + use crate::timeseries_database::timeseries_sql_rewrite::{ TimeSeriesQueryToSQLError, TimeSeriesTable, }; -use crate::timeseries_database::{TimeSeriesQueryable, TimeSeriesSQLQueryable}; +use crate::timeseries_database::{DatabaseType, TimeSeriesQueryable, TimeSeriesSQLQueryable}; use crate::timeseries_query::TimeSeriesQuery; use async_trait::async_trait; use connectorx::prelude::*; use polars::prelude::PolarsError; use polars_core::error::ArrowError; use polars_core::prelude::{DataFrame, Series}; -use sea_query::PostgresQueryBuilder; use std::error::Error; use std::fmt::{Display, Formatter}; use std::sync::Arc; -use std::thread; use thiserror::Error; use tokio::runtime::Runtime; +use tokio::task; use tonic::Status; #[derive(Error, Debug)] @@ -64,11 +64,11 @@ impl BigQueryDatabase { #[async_trait] impl TimeSeriesQueryable for BigQueryDatabase { async fn execute(&mut self, tsq: &TimeSeriesQuery) -> Result> { - let query_string = self.get_sql_string(tsq, PostgresQueryBuilder)?; + let query_string = self.get_sql_string(tsq, DatabaseType::BigQuery)?; let key = self.gcp_sa_key.clone(); // Using a thread here since we do not want nested runtimes in the same thread let (chunks, schema) = - thread::spawn(move || { + task::spawn_blocking(move || { let source = BigQuerySource::new(Arc::new(Runtime::new().unwrap()), &key).unwrap(); let queries = [CXQuery::naked(query_string)]; let mut destination = Arrow2Destination::new(); @@ -80,15 +80,20 @@ impl TimeSeriesQueryable for BigQueryDatabase { dispatcher.run().unwrap(); let (chunks, schema) = destination.arrow().unwrap(); return (chunks, schema); - }) - .join() - .unwrap(); + }).await?; let mut series_vec = vec![]; - for (ch, field) in chunks.into_iter().zip(schema.fields.iter()) { - let mut array_refs = vec![]; - for arr in ch.into_arrays() { - array_refs.push(arr) + let mut array_ref_vecs = vec![]; + + for ch in chunks.into_iter() { + for (i,arr) in ch.into_arrays().into_iter().enumerate() { + if array_ref_vecs.len() < i+1 { + array_ref_vecs.push(vec![]); + } + array_ref_vecs.get_mut(i).unwrap().push(arr) } + } + + for (array_refs, field) in array_ref_vecs.into_iter().zip(schema.fields.iter()) { let ser = Series::try_from((field.name.as_str(), array_refs)).unwrap(); series_vec.push(ser); } diff --git a/chrontext/src/timeseries_database/timeseries_sql_rewrite.rs b/chrontext/src/timeseries_database/timeseries_sql_rewrite.rs index af7d8f8..9456f96 100644 --- a/chrontext/src/timeseries_database/timeseries_sql_rewrite.rs +++ b/chrontext/src/timeseries_database/timeseries_sql_rewrite.rs @@ -3,17 +3,22 @@ mod partitioning_support; use crate::timeseries_database::timeseries_sql_rewrite::expression_rewrite::SPARQLToSQLExpressionTransformer; use crate::timeseries_database::timeseries_sql_rewrite::partitioning_support::add_partitioned_timestamp_conditions; +use crate::timeseries_database::DatabaseType; use crate::timeseries_query::{BasicTimeSeriesQuery, Synchronizer, TimeSeriesQuery}; use oxrdf::{NamedNode, Variable}; use polars_core::datatypes::AnyValue; use polars_core::frame::DataFrame; -use sea_query::{Alias, BinOper, ColumnRef, DynIden, JoinType, Order, Query, SeaRc, SelectStatement, SimpleExpr, TableRef}; +use sea_query::extension::bigquery::{NamedField, Unnest}; +use sea_query::IntoIden; +use sea_query::{ + Alias, BinOper, ColumnRef, JoinType, Order, Query, SelectStatement, SimpleExpr, + TableRef, +}; use sea_query::{Expr as SeaExpr, Iden, Value}; use spargebra::algebra::{AggregateExpression, Expression}; use std::collections::{HashMap, HashSet}; use std::error::Error; use std::fmt::{Display, Formatter, Write}; -use sea_query::IntoIden; const YEAR_PARTITION_COLUMN_NAME: &str = "year_partition_column_name"; const MONTH_PARTITION_COLUMN_NAME: &str = "month_partition_column_name"; @@ -111,13 +116,18 @@ pub struct TimeSeriesTable { pub struct TimeSeriesQueryToSQLTransformer<'a> { pub partition_support: bool, pub tables: &'a Vec, + pub database_type: DatabaseType, } impl TimeSeriesQueryToSQLTransformer<'_> { - pub fn new(tables: &Vec) -> TimeSeriesQueryToSQLTransformer { + pub fn new( + tables: &Vec, + database_type: DatabaseType, + ) -> TimeSeriesQueryToSQLTransformer { TimeSeriesQueryToSQLTransformer { partition_support: check_partitioning_support(tables), tables, + database_type, } } @@ -184,7 +194,8 @@ impl TimeSeriesQueryToSQLTransformer<'_> { let mut sorted_cols: Vec<&String> = columns.iter().collect(); sorted_cols.sort(); for c in sorted_cols { - outer_select.expr(SimpleExpr::Column(ColumnRef::Column(Name::Column(c.clone()).into_iden(), + outer_select.expr(SimpleExpr::Column(ColumnRef::Column( + Name::Column(c.clone()).into_iden(), ))); } use_select = outer_select; @@ -243,7 +254,7 @@ impl TimeSeriesQueryToSQLTransformer<'_> { ) -> Result<(SelectStatement, HashSet), TimeSeriesQueryToSQLError> { let subquery_alias = "subquery"; let subquery_name = Name::Table(subquery_alias.to_string()); - let mut expr_transformer = self.create_transformer(Some(&subquery_name)); + let mut expr_transformer = self.create_transformer(Some(&subquery_name), self.database_type.clone()); let se = expr_transformer.sparql_expression_to_sql_expression(e)?; let (select, mut columns) = self.create_query_nested( @@ -306,21 +317,49 @@ impl TimeSeriesQueryToSQLTransformer<'_> { let mut static_select = Query::select(); let mapping_values_alias = "mapping"; - static_select.from_values(value_tuples, Alias::new(mapping_values_alias)); - static_select.expr_as( - SimpleExpr::Column(ColumnRef::TableColumn( - Name::Table(mapping_values_alias.to_string()).into_iden(), - Name::Column("EXPR$0".to_string()).into_iden(), - )), - Alias::new(identifier_colname), - ); - static_select.expr_as( - SimpleExpr::Column(ColumnRef::TableColumn( - Name::Table(mapping_values_alias.to_string()).into_iden(), - Name::Column("EXPR$1".to_string()).into_iden(), - )), - Alias::new(column_name), - ); + match &self.database_type { + DatabaseType::BigQuery => { + static_select.columns([ + ColumnRef::Column(Name::Column(identifier_colname.to_string()).into_iden()), + ColumnRef::Column(Name::Column(column_name.to_string()).into_iden()), + ]); + let mut structs = vec![]; + for (e, n) in value_tuples { + structs.push(SimpleExpr::Struct(vec![ + NamedField::new( + Some(identifier_colname.to_string()), + SimpleExpr::Value(Value::String(Some(Box::new(e.to_string())))), + ), + NamedField::new( + Some(column_name.to_string()), + SimpleExpr::Value(Value::BigInt(Some(n))), + ), + ])) + } + static_select.from(TableRef::Unnest( + Unnest::new(structs), + Name::Table("values".to_string()).into_iden(), + )); + } + DatabaseType::Dremio => { + static_select.from_values(value_tuples, Alias::new(mapping_values_alias)); + static_select.expr_as( + SimpleExpr::Column(ColumnRef::TableColumn( + Name::Table(mapping_values_alias.to_string()).into_iden(), + Name::Column("EXPR$0".to_string()).into_iden(), + )), + Alias::new(identifier_colname), + ); + static_select.expr_as( + SimpleExpr::Column(ColumnRef::TableColumn( + Name::Table(mapping_values_alias.to_string()).into_iden(), + Name::Column("EXPR$1".to_string()).into_iden(), + )), + Alias::new(column_name), + ); + } + } + let static_alias = "static_query"; let (basic_select, mut columns) = self.create_basic_select(btsq, project_date_partition)?; @@ -486,7 +525,7 @@ impl TimeSeriesQueryToSQLTransformer<'_> { expression: &Expression, timestamp_column: Option<&String>, ) -> Result<(SimpleExpr, bool), TimeSeriesQueryToSQLError> { - let mut transformer = self.create_transformer(None); + let mut transformer = self.create_transformer(None, self.database_type.clone()); let mut se = transformer.sparql_expression_to_sql_expression(expression)?; let mut partitioned = false; if self.partition_support { @@ -518,7 +557,7 @@ impl TimeSeriesQueryToSQLTransformer<'_> { let outer_query_str = "outer_query"; let outer_query_name = Name::Table(outer_query_str.to_string()); let mut new_columns = HashSet::new(); - let mut agg_transformer = self.create_transformer(Some(&outer_query_name)); + let mut agg_transformer = self.create_transformer(Some(&outer_query_name), self.database_type.clone()); let mut aggs = vec![]; for (_, agg) in aggregations { aggs.push(agg_transformer.sparql_aggregate_expression_to_sql_expression(agg)?); @@ -583,6 +622,7 @@ impl TimeSeriesQueryToSQLTransformer<'_> { fn create_transformer<'a>( &'a self, table_name: Option<&'a Name>, + database_type: DatabaseType ) -> SPARQLToSQLExpressionTransformer { if self.partition_support { SPARQLToSQLExpressionTransformer::new( @@ -590,9 +630,9 @@ impl TimeSeriesQueryToSQLTransformer<'_> { Some(YEAR_PARTITION_COLUMN_NAME), Some(MONTH_PARTITION_COLUMN_NAME), Some(DAY_PARTITION_COLUMN_NAME), - ) + database_type) } else { - SPARQLToSQLExpressionTransformer::new(table_name, None, None, None) + SPARQLToSQLExpressionTransformer::new(table_name, None, None, None, database_type) } } } @@ -710,6 +750,7 @@ mod tests { use sea_query::PostgresQueryBuilder; use spargebra::algebra::{AggregateExpression, Expression, Function}; use std::vec; + use crate::timeseries_database::DatabaseType; #[test] pub fn test_translate() { @@ -761,7 +802,7 @@ mod tests { day_column: Some("dir2".to_string()), }; let tables = vec![table]; - let transformer = TimeSeriesQueryToSQLTransformer::new(&tables); + let transformer = TimeSeriesQueryToSQLTransformer::new(&tables, DatabaseType::Dremio); let (sql_query, _) = transformer.create_query(&tsq, false).unwrap(); assert_eq!( &sql_query.to_string(PostgresQueryBuilder), @@ -980,7 +1021,7 @@ mod tests { day_column: Some("dir2".to_string()), }; let tables = vec![table]; - let transformer = TimeSeriesQueryToSQLTransformer::new(&tables); + let transformer = TimeSeriesQueryToSQLTransformer::new(&tables, DatabaseType::Dremio); let (sql_query, _) = transformer.create_query(&tsq, false).unwrap(); let expected_str = r#"SELECT AVG("outer_query"."val_dir") AS "f7ca5ee9058effba8691ac9c642fbe95", AVG("outer_query"."val_speed") AS "990362f372e4019bc151c13baf0b50d5", "outer_query"."year" AS "year", "outer_query"."month" AS "month", "outer_query"."day" AS "day", "outer_query"."hour" AS "hour", "outer_query"."minute_10" AS "minute_10", "outer_query"."grouping_col_0" AS "grouping_col_0" FROM (SELECT "inner_query"."day" AS "day", "inner_query"."grouping_col_0" AS "grouping_col_0", "inner_query"."hour" AS "hour", "inner_query"."minute_10" AS "minute_10", "inner_query"."month" AS "month", "inner_query"."t" AS "t", "inner_query"."val_dir" AS "val_dir", "inner_query"."val_speed" AS "val_speed", "inner_query"."year" AS "year" FROM (SELECT "day" AS "day", "grouping_col_0" AS "grouping_col_0", "hour" AS "hour", "minute_10" AS "minute_10", "month" AS "month", "t" AS "t", "val_dir" AS "val_dir", "val_speed" AS "val_speed", "subquery"."year_partition_column_name" AS "year" FROM (SELECT "day" AS "day", "day_partition_column_name" AS "day_partition_column_name", "grouping_col_0" AS "grouping_col_0", "hour" AS "hour", "minute_10" AS "minute_10", "month_partition_column_name" AS "month_partition_column_name", "t" AS "t", "val_dir" AS "val_dir", "val_speed" AS "val_speed", "year_partition_column_name" AS "year_partition_column_name", "subquery"."month_partition_column_name" AS "month" FROM (SELECT "day_partition_column_name" AS "day_partition_column_name", "grouping_col_0" AS "grouping_col_0", "hour" AS "hour", "minute_10" AS "minute_10", "month_partition_column_name" AS "month_partition_column_name", "t" AS "t", "val_dir" AS "val_dir", "val_speed" AS "val_speed", "year_partition_column_name" AS "year_partition_column_name", "subquery"."day_partition_column_name" AS "day" FROM (SELECT "day_partition_column_name" AS "day_partition_column_name", "grouping_col_0" AS "grouping_col_0", "minute_10" AS "minute_10", "month_partition_column_name" AS "month_partition_column_name", "t" AS "t", "val_dir" AS "val_dir", "val_speed" AS "val_speed", "year_partition_column_name" AS "year_partition_column_name", date_part('hour', "subquery"."t") AS "hour" FROM (SELECT "day_partition_column_name" AS "day_partition_column_name", "grouping_col_0" AS "grouping_col_0", "month_partition_column_name" AS "month_partition_column_name", "t" AS "t", "val_dir" AS "val_dir", "val_speed" AS "val_speed", "year_partition_column_name" AS "year_partition_column_name", CAST(FLOOR(date_part('minute', "subquery"."t") / 10) AS INTEGER) AS "minute_10" FROM (SELECT "first_query"."day_partition_column_name" AS "day_partition_column_name", "first_query"."grouping_col_0" AS "grouping_col_0", "first_query"."month_partition_column_name" AS "month_partition_column_name", "first_query"."t" AS "t", "first_query"."val_speed" AS "val_speed", "first_query"."year_partition_column_name" AS "year_partition_column_name", "other_0"."day_partition_column_name" AS "day_partition_column_name", "other_0"."grouping_col_0" AS "grouping_col_0", "other_0"."month_partition_column_name" AS "month_partition_column_name", "other_0"."val_dir" AS "val_dir", "other_0"."year_partition_column_name" AS "year_partition_column_name" FROM (SELECT "basic_query"."day_partition_column_name" AS "day_partition_column_name", "basic_query"."month_partition_column_name" AS "month_partition_column_name", "basic_query"."t" AS "t", "basic_query"."val_speed" AS "val_speed", "basic_query"."year_partition_column_name" AS "year_partition_column_name", "static_query"."grouping_col_0" AS "grouping_col_0" FROM (SELECT "timestamp" AS "t", "dir3" AS "ts_external_id_1", "value" AS "val_speed", CAST("dir2" AS INTEGER) AS "day_partition_column_name", CAST("dir1" AS INTEGER) AS "month_partition_column_name", CAST("dir0" AS INTEGER) AS "year_partition_column_name" FROM "s3.ct-benchmark"."timeseries_double" WHERE "dir3" IN ('id1')) AS "basic_query" INNER JOIN (SELECT "mapping"."EXPR$0" AS "ts_external_id_1", "mapping"."EXPR$1" AS "grouping_col_0" FROM (VALUES ('id1', 0)) AS "mapping") AS "static_query" ON "static_query"."ts_external_id_1" = "basic_query"."ts_external_id_1") AS "first_query" INNER JOIN (SELECT "basic_query"."day_partition_column_name" AS "day_partition_column_name", "basic_query"."month_partition_column_name" AS "month_partition_column_name", "basic_query"."t" AS "t", "basic_query"."val_dir" AS "val_dir", "basic_query"."year_partition_column_name" AS "year_partition_column_name", "static_query"."grouping_col_0" AS "grouping_col_0" FROM (SELECT "timestamp" AS "t", "dir3" AS "ts_external_id_2", "value" AS "val_dir", CAST("dir2" AS INTEGER) AS "day_partition_column_name", CAST("dir1" AS INTEGER) AS "month_partition_column_name", CAST("dir0" AS INTEGER) AS "year_partition_column_name" FROM "s3.ct-benchmark"."timeseries_double" WHERE "dir3" IN ('id2')) AS "basic_query" INNER JOIN (SELECT "mapping"."EXPR$0" AS "ts_external_id_2", "mapping"."EXPR$1" AS "grouping_col_0" FROM (VALUES ('id2', 1)) AS "mapping") AS "static_query" ON "static_query"."ts_external_id_2" = "basic_query"."ts_external_id_2") AS "other_0" ON "first_query"."grouping_col_0" = "other_0"."grouping_col_0" AND "first_query"."t" = "other_0"."t" AND "first_query"."year_partition_column_name" = "other_0"."year_partition_column_name" AND "first_query"."month_partition_column_name" = "other_0"."month_partition_column_name" AND "first_query"."day_partition_column_name" = "other_0"."day_partition_column_name" WHERE ("year_partition_column_name" > 2022 OR ("year_partition_column_name" = 2022 AND "month_partition_column_name" > 8) OR ("year_partition_column_name" = 2022 AND "month_partition_column_name" = 8 AND "day_partition_column_name" > 30) OR ("year_partition_column_name" = 2022 AND "month_partition_column_name" = 8 AND "day_partition_column_name" = 30 AND "t" >= '2022-08-30 08:46:53')) AND ("year_partition_column_name" < 2022 OR ("year_partition_column_name" = 2022 AND "month_partition_column_name" < 8) OR ("year_partition_column_name" = 2022 AND "month_partition_column_name" = 8 AND "day_partition_column_name" < 30) OR ("year_partition_column_name" = 2022 AND "month_partition_column_name" = 8 AND "day_partition_column_name" = 30 AND "t" <= '2022-08-30 21:46:53'))) AS "subquery") AS "subquery") AS "subquery") AS "subquery") AS "subquery") AS "inner_query") AS "outer_query" GROUP BY "outer_query"."year", "outer_query"."month", "outer_query"."day", "outer_query"."hour", "outer_query"."minute_10", "outer_query"."grouping_col_0" ORDER BY "grouping_col_0" ASC"#; diff --git a/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite.rs b/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite.rs index f4171de..d925617 100644 --- a/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite.rs +++ b/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite.rs @@ -1,12 +1,14 @@ use oxrdf::vocab::xsd; use polars::export::chrono::{DateTime, NaiveDateTime, Utc}; -use sea_query::{BinOper, ColumnRef, Function, SimpleExpr, UnOper, Value}; -use sea_query::{Expr as SeaExpr, Func}; -use spargebra::algebra::Expression; +use sea_query::extension::bigquery::{BqFunc, DateTimePart}; use sea_query::IntoIden; +use sea_query::{BinOper, ColumnRef, SimpleExpr, UnOper, Value}; +use sea_query::{Expr as SeaExpr, Func}; +use spargebra::algebra::{Expression}; use crate::constants::DATETIME_AS_SECONDS; use crate::timeseries_database::timeseries_sql_rewrite::{Name, TimeSeriesQueryToSQLError}; +use crate::timeseries_database::DatabaseType; pub mod aggregate_expressions; @@ -16,6 +18,7 @@ pub(crate) struct SPARQLToSQLExpressionTransformer<'a> { month_col: Option<&'a str>, day_col: Option<&'a str>, pub used_partitioning: bool, + database_type: DatabaseType } impl SPARQLToSQLExpressionTransformer<'_> { @@ -24,6 +27,7 @@ impl SPARQLToSQLExpressionTransformer<'_> { year_col: Option<&'a str>, month_col: Option<&'a str>, day_col: Option<&'a str>, + database_type: DatabaseType ) -> SPARQLToSQLExpressionTransformer<'a> { SPARQLToSQLExpressionTransformer { table_name, @@ -31,6 +35,7 @@ impl SPARQLToSQLExpressionTransformer<'_> { month_col, day_col, used_partitioning: false, + database_type } } @@ -178,26 +183,42 @@ impl SPARQLToSQLExpressionTransformer<'_> { self.day_col.as_ref().unwrap(), ) } else { - let date_part_name = match f { - spargebra::algebra::Function::Year => "year", - spargebra::algebra::Function::Month => "month", - spargebra::algebra::Function::Day => "day", - spargebra::algebra::Function::Hours => "hour", - spargebra::algebra::Function::Minutes => "minute", - spargebra::algebra::Function::Seconds => "second", - _ => { - panic!("Cannot happen") + SimpleExpr::FunctionCall(match &self.database_type { + DatabaseType::BigQuery => { + let datetime_part = match f { + spargebra::algebra::Function::Year => DateTimePart::YEAR, + spargebra::algebra::Function::Month => DateTimePart::MONTH, + spargebra::algebra::Function::Day => DateTimePart::DAY, + spargebra::algebra::Function::Hours => DateTimePart::HOUR, + spargebra::algebra::Function::Minutes => DateTimePart::MINUTE, + spargebra::algebra::Function::Seconds => DateTimePart::SECOND, + _ => { + panic!("Cannot happen") + } + }; + BqFunc::extract(datetime_part, mapped_e) + }, + DatabaseType::Dremio => { + let date_part_name = match f { + spargebra::algebra::Function::Year => "year", + spargebra::algebra::Function::Month => "month", + spargebra::algebra::Function::Day => "day", + spargebra::algebra::Function::Hours => "hour", + spargebra::algebra::Function::Minutes => "minute", + spargebra::algebra::Function::Seconds => "second", + _ => { + panic!("Cannot happen") + } + }; + Func::cust(Name::Function("date_part".to_string()).into_iden()) + .args(vec![ + SimpleExpr::Value(Value::String(Some(Box::new( + date_part_name.to_string(), + )))), + mapped_e, + ]) } - }; - SimpleExpr::FunctionCall( - Func::cust(Name::Function("date_part".to_string()).into_iden()) - .args(vec![ - SimpleExpr::Value(Value::String(Some(Box::new( - date_part_name.to_string(), - )))), - mapped_e, - ]), - ) + }) } } spargebra::algebra::Function::Custom(c) => { @@ -205,15 +226,13 @@ impl SPARQLToSQLExpressionTransformer<'_> { let mapped_e = self.sparql_expression_to_sql_expression(e)?; if c.as_str() == DATETIME_AS_SECONDS { SimpleExpr::FunctionCall( - Func::cust( - Name::Function("UNIX_TIMESTAMP".to_string()).into_iden(), - ) - .args(vec![ - mapped_e, - SimpleExpr::Value(Value::String(Some(Box::new( - "YYYY-MM-DD HH:MI:SS.FFF".to_string(), - )))), - ]), + Func::cust(Name::Function("UNIX_TIMESTAMP".to_string()).into_iden()) + .args(vec![ + mapped_e, + SimpleExpr::Value(Value::String(Some(Box::new( + "YYYY-MM-DD HH:MI:SS.FFF".to_string(), + )))), + ]), ) } else if c.as_str() == xsd::INTEGER.as_str() { SimpleExpr::AsEnum( diff --git a/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite/aggregate_expressions.rs b/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite/aggregate_expressions.rs index 1d69b96..43da282 100644 --- a/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite/aggregate_expressions.rs +++ b/chrontext/src/timeseries_database/timeseries_sql_rewrite/expression_rewrite/aggregate_expressions.rs @@ -2,6 +2,7 @@ use super::SPARQLToSQLExpressionTransformer; use crate::timeseries_database::timeseries_sql_rewrite::TimeSeriesQueryToSQLError; use sea_query::{Func, SimpleExpr}; use spargebra::algebra::AggregateExpression; +use crate::timeseries_database::DatabaseType; impl SPARQLToSQLExpressionTransformer<'_> { //TODO: Support distinct in aggregates.. how??? diff --git a/chrontext/src/timeseries_database/timeseries_sql_rewrite/partitioning_support.rs b/chrontext/src/timeseries_database/timeseries_sql_rewrite/partitioning_support.rs index af3e5e8..81611c8 100644 --- a/chrontext/src/timeseries_database/timeseries_sql_rewrite/partitioning_support.rs +++ b/chrontext/src/timeseries_database/timeseries_sql_rewrite/partitioning_support.rs @@ -1,7 +1,7 @@ use super::Name; use log::debug; use polars_core::export::chrono::Datelike; -use sea_query::{IntoIden,BinOper, ColumnRef, SimpleExpr, Value}; +use sea_query::{IntoIden, BinOper, ColumnRef, SimpleExpr, Value, FunctionCall}; pub fn add_partitioned_timestamp_conditions( se: SimpleExpr, @@ -40,8 +40,7 @@ pub fn add_partitioned_timestamp_conditions( .collect(); let added = rewrites_and_added.iter().fold(false, |x, (_, y)| x || *y); let se_rewrites: Vec = rewrites_and_added.into_iter().map(|(x, _)| x).collect(); - //TODO: Upstream change to sea-query to avoid this messy workaround - (SimpleExpr::FunctionCall(func_call.clone().args(se_rewrites)), added) + (SimpleExpr::FunctionCall(FunctionCall::new(func_call.get_func().clone()).args(se_rewrites)), added) } SimpleExpr::Binary(left, op, right) => rewrite_binary_expression( *left, diff --git a/py_chrontext/src/lib.rs b/py_chrontext/src/lib.rs index 12ad28e..326957d 100644 --- a/py_chrontext/src/lib.rs +++ b/py_chrontext/src/lib.rs @@ -132,13 +132,6 @@ impl Engine { if self.engine.is_none() { return Err(PyQueryError::MissingTimeSeriesDatabaseError.into()); } - let res = env_logger::try_init(); - match res { - Ok(_) => {} - Err(_) => { - debug!("Tried to initialize logger which is already initialize") - } - } let mut builder = Builder::new_multi_thread(); builder.enable_all(); let df_result = builder @@ -290,6 +283,14 @@ impl TimeSeriesTable { #[pymodule] fn _chrontext(_py: Python<'_>, m: &PyModule) -> PyResult<()> { + let res = env_logger::try_init(); + match res { + Ok(_) => {} + Err(_) => { + debug!("Tried to initialize logger which is already initialize"); + } + } + m.add_class::()?; m.add_class::()?; m.add_class::()?;