Skip to content

Commit

Permalink
Multitype change
Browse files Browse the repository at this point in the history
  • Loading branch information
magbak committed Mar 7, 2024
1 parent 001ddfd commit b7a8590
Show file tree
Hide file tree
Showing 15 changed files with 188 additions and 219 deletions.
10 changes: 6 additions & 4 deletions chrontext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ version = "0.4.0"
edition = "2021"

[dependencies]
polars = {version="0.35.4", features=["lazy", "concat_str", "unique_counts", "group_by_list", "list_eval", "abs", "round_series", "is_in", "cum_agg", "dtype-full", "cse", "nightly", "performant"] }
polars = {version="0.37.0", features=["lazy", "concat_str", "unique_counts", "group_by_list", "list_eval", "abs", "round_series", "is_in", "cum_agg", "dtype-full", "cse", "nightly", "performant"] }
tokio = {version="1.18.2", features=["rt-multi-thread", "rt"]}
log = "0.4.17"
#spargebra = { path = "../../spargebra", features = ["rdf-star"]}
#query_processing = { path = "../../query_processing"}
query_processing = { git = "https://github.com/DataTreehouse/query_processing"}
#representation = { path ="../../representation"}
representation = { git = "https://github.com/DataTreehouse/representation"}
spargebra = { git = "https://github.com/DataTreehouse/spargebra", features = ["rdf-star"]}
sparesults = {version="0.1.8"}
Expand All @@ -18,13 +19,14 @@ reqwest= {version="0.11.23", features=["stream"]}
env_logger = "0.10.0"
tonic = "0.10.2"
thiserror = "1.0.31"
polars-core = "0.35.4"
polars-core = "0.37.0"
sea-query = { git="https://github.com/DataTreehouse/sea-query", branch="feature/bigquery_basic_support", features=["with-chrono", "backend-bigquery"]}
async-trait = "0.1.68"
base64 = "0.21.3"
opcua = {version="0.12.0", features = ["vendored-openssl"]}
async-recursion = "1.0.4"
bigquery-polars = {git="https://github.com/DataTreehouse/bigquery-polars"}
bigquery-polars = {path = "../../bigquery-polars/bigquery-polars"}
#bigquery-polars = {git="https://github.com/DataTreehouse/bigquery-polars"}
crossbeam = {version = "0.8.2"}
serde_json = "1.0.105"
oxigraph = "0.3.22"
Expand All @@ -33,7 +35,7 @@ chrono = {version = "0.4.31", features = ["clock"]}
[dev-dependencies]
bollard = "0.15.0"
rstest = "0.18.2"
polars = {version="0.35.4", features=["lazy", "unique_counts", "group_by_list"]}
polars = {version="0.37.0", features=["lazy", "unique_counts", "group_by_list"]}
tokio = {version="1.18.2", features=["rt-multi-thread"]}
serial_test = "2.0.0"
futures-util = "0.3.21"
Expand Down
6 changes: 4 additions & 2 deletions chrontext/src/combiner/lazy_graph_patterns/left_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use log::debug;
use spargebra::algebra::{Expression, GraphPattern};
use spargebra::Query;
use std::collections::HashMap;
use query_processing::graph_patterns::{filter, left_join};
use polars::prelude::JoinType;
use query_processing::graph_patterns::{filter, join};

impl Combiner {
#[async_recursion]
Expand Down Expand Up @@ -77,7 +78,8 @@ impl Combiner {
)
.await?;
right_solution_mappings = filter(right_solution_mappings, &expression_context)?;
right_solution_mappings.rdf_node_types.remove(expression_context.as_str());
}
Ok(left_join(left_solution_mappings, right_solution_mappings)?)
Ok(join(left_solution_mappings, right_solution_mappings, JoinType::Left)?)
}
}
2 changes: 1 addition & 1 deletion chrontext/src/combiner/lazy_graph_patterns/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Combiner {
&right_context,
)
.await?;
Ok(union(left_solution_mappings, right_solution_mappings)?)
Ok(union(vec![left_solution_mappings, right_solution_mappings])?)

}
}
4 changes: 2 additions & 2 deletions chrontext/src/combiner/static_subqueries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use representation::query_context::Context;
use crate::sparql_result_to_polars::create_static_query_dataframe;
use log::debug;
use oxrdf::{Term, Variable};
use polars::prelude::{col, Expr, IntoLazy};
use polars::prelude::{col, Expr, IntoLazy, JoinType};
use polars_core::prelude::{UniqueKeepStrategy};
use spargebra::algebra::GraphPattern;
use spargebra::term::GroundTerm;
Expand Down Expand Up @@ -46,7 +46,7 @@ impl Combiner {
debug!("Static query results:\n {}", df);
let mut out_solution_mappings = SolutionMappings::new(df.lazy(), datatypes);
if let Some(use_solution_mappings) = use_solution_mappings {
out_solution_mappings = join(out_solution_mappings, use_solution_mappings)?;
out_solution_mappings = join(out_solution_mappings, use_solution_mappings, JoinType::Inner)?;
}
Ok(out_solution_mappings)
}
Expand Down
23 changes: 15 additions & 8 deletions chrontext/src/combiner/time_series_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use log::debug;
use oxrdf::vocab::xsd;
use oxrdf::Term;
use polars::prelude::{col, Expr, IntoLazy, JoinArgs, JoinType};
use polars_core::prelude::{DataFrame, DataType};
use polars_core::prelude::{CategoricalOrdering, DataFrame, DataType};
use sparesults::QuerySolution;
use std::collections::{HashMap, HashSet};
use polars_core::series::Series;
use representation::RDFNodeType;
use representation::{BaseRDFNodeType, RDFNodeType};

impl Combiner {
pub async fn execute_attach_time_series_query(
Expand All @@ -23,19 +23,26 @@ impl Combiner {
if !tsq.has_identifiers() {
let mut expected_cols:Vec<_> = tsq.expected_columns().into_iter().collect();
expected_cols.sort();
let timestamp_vars:Vec<_> = tsq.get_timestamp_variables().into_iter().map(|x|x.variable.as_str()).collect();
let drop_cols = get_drop_cols(tsq);
let mut series_vec = vec![];
for e in expected_cols {
if !drop_cols.contains(e) {
series_vec.push(Series::new_empty(e, &DataType::Null));
solution_mappings.rdf_node_types.insert(e.to_string(), RDFNodeType::None);
if timestamp_vars.contains(&e) {
let dt = BaseRDFNodeType::Literal(xsd::DATE_TIME.into_owned());
series_vec.push(Series::new_empty(e, &dt.polars_data_type()));
solution_mappings.rdf_node_types.insert(e.to_string(), dt.as_rdf_node_type());
} else {
series_vec.push(Series::new_empty(e, &BaseRDFNodeType::None.polars_data_type()));
solution_mappings.rdf_node_types.insert(e.to_string(), RDFNodeType::None);
}
}
}
let df = DataFrame::new(series_vec).unwrap();
for d in drop_cols {
if solution_mappings.rdf_node_types.contains_key(&d) {
solution_mappings.rdf_node_types.remove(&d);
solution_mappings.mappings = solution_mappings.mappings.drop_columns(vec![d]);
solution_mappings.mappings = solution_mappings.mappings.drop(vec![d]);
}
}
solution_mappings.mappings = solution_mappings.mappings.join(df.lazy(), vec![], vec![], JoinArgs::new(JoinType::Cross));
Expand Down Expand Up @@ -84,10 +91,10 @@ impl Combiner {
solution_mappings.mappings = solution_mappings.mappings.collect().unwrap().lazy();
let mut ts_lf = ts_df.lazy();
if let Some(cat_col) = &to_cat_col {
ts_lf = ts_lf.with_column(col(cat_col).cast(DataType::Categorical(None)));
ts_lf = ts_lf.with_column(col(cat_col).cast(DataType::Categorical(None, CategoricalOrdering::Physical)));
solution_mappings.mappings = solution_mappings
.mappings
.with_column(col(cat_col).cast(DataType::Categorical(None)));
.with_column(col(cat_col).cast(DataType::Categorical(None, CategoricalOrdering::Physical)));
}
let on_reverse_false = vec![false].repeat(on_cols.len());
ts_lf = ts_lf.sort_by_exprs(on_cols.as_slice(), on_reverse_false.as_slice(), true, false);
Expand All @@ -106,7 +113,7 @@ impl Combiner {
on_cols.as_slice(),
JoinArgs::new(JoinType::Inner),
)
.drop_columns(drop_cols.iter());
.drop(drop_cols.iter());
for c in &drop_cols {
solution_mappings.rdf_node_types.remove(c);
}
Expand Down
2 changes: 1 addition & 1 deletion chrontext/src/sparql_database/sparql_endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::SparqlQueryable;
use async_trait::async_trait;
use reqwest::header::{ACCEPT, CONTENT_TYPE, USER_AGENT};
use reqwest::header::{ACCEPT, USER_AGENT};
use reqwest::StatusCode;
use sparesults::{
ParseError, QueryResultsFormat, QueryResultsParser, QueryResultsReader, QuerySolution,
Expand Down
Loading

0 comments on commit b7a8590

Please sign in to comment.