diff --git a/Cargo.toml b/Cargo.toml index 83dbe73..9d2d33a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,14 +34,10 @@ pyo3 = "0.22.0" reqwest = "0.12.3" env_logger = "0.11.3" thiserror = "1.0.58" -base64 = "0.22.0" async-recursion = "1.1.0" async-trait = "0.1.81" -crossbeam = "0.8.2" chrono = "0.4.37" filesize = "0.2.0" -futures = "0.3.30" -tokio-stream = "0.1.15" serde = "1.0.203" serde_json = "1.0.117" backoff = "0.4.0" @@ -49,10 +45,6 @@ gcp-bigquery-client = "0.20.0" rayon = "1.10.0" opcua = {version="0.12.0", features = ["vendored-openssl"]} url = "2.5.2" -bollard = "0.15.0" -rstest = "0.21.0" -serial_test = "3.1.1" -futures-util = "0.3.30" [patch.crates-io] oxrdf = { git = 'https://github.com/oxigraph/oxigraph.git', rev = "1e08089454a9aa8b3f66f4a422aed27e73faae84"} diff --git a/lib/bigquery-polars/Cargo.toml b/lib/bigquery-polars/Cargo.toml index ae6f4dd..09ecf5e 100644 --- a/lib/bigquery-polars/Cargo.toml +++ b/lib/bigquery-polars/Cargo.toml @@ -7,6 +7,5 @@ edition = "2021" gcp-bigquery-client = {workspace = true, features = ["rust-tls"]} polars = {workspace = true, features = ["dtype-full", "cse", "nightly", "performant", "timezones", "lazy"]} thiserror.workspace = true -backoff.workspace = true tokio = {workspace = true, features = ["time"]} rayon.workspace = true diff --git a/lib/chrontext/Cargo.toml b/lib/chrontext/Cargo.toml index 19b71dc..9301d6e 100644 --- a/lib/chrontext/Cargo.toml +++ b/lib/chrontext/Cargo.toml @@ -7,7 +7,6 @@ edition = "2021" virtualization = {path = "../virtualization"} virtualized_query = { path = "../virtualized_query" } -templates.workspace = true polars = { workspace = true, features = [ "lazy", "concat_str", @@ -21,7 +20,6 @@ polars = { workspace = true, features = [ "cse", "nightly", "performant"] } -tokio = { workspace = true, features = ["rt-multi-thread", "rt"] } log.workspace = true spargebra.workspace = true representation.workspace=true @@ -29,23 +27,9 @@ query_processing.workspace = true sparesults.workspace = true oxrdf.workspace = true reqwest = { workspace = true, features = ["stream"] } -env_logger.workspace = true thiserror.workspace = true async-recursion.workspace = true async-trait.workspace = true oxigraph.workspace = true -chrono = { workspace = true, features = ["clock"] } serde.workspace = true -serde_json.workspace = true filesize.workspace = true -pyo3.workspace = true - -[dev-dependencies] -bollard.workspace = true -rstest.workspace = true -polars = { workspace = true, features = ["lazy", "unique_counts"] } -tokio = { workspace = true, features = ["rt-multi-thread"] } -serial_test.workspace = true -futures-util.workspace = true -reqwest = { workspace = true, features = ["stream", "json"] } -opcua.workspace = true \ No newline at end of file diff --git a/lib/chrontext/src/combiner/virtualized_queries.rs b/lib/chrontext/src/combiner/virtualized_queries.rs index bbb2396..c0e1b5a 100644 --- a/lib/chrontext/src/combiner/virtualized_queries.rs +++ b/lib/chrontext/src/combiner/virtualized_queries.rs @@ -15,6 +15,7 @@ use representation::{BaseRDFNodeType, RDFNodeType}; use sparesults::QuerySolution; use std::collections::{HashMap, HashSet}; use virtualized_query::{BasicVirtualizedQuery, VirtualizedQuery}; +use virtualized_query::pushdown_setting::PushdownSetting; impl Combiner { pub fn attach_expected_empty_results(&self, vq:&VirtualizedQuery, mut solution_mappings: SolutionMappings) -> SolutionMappings { @@ -61,9 +62,12 @@ impl Combiner { return Ok(self.attach_expected_empty_results(&vq, solution_mappings)) } - let on_cols = get_join_columns(&vq, &solution_mappings.rdf_node_types); //Find the columns we should join on: - vq = vq.add_sorting_pushdown(&on_cols); + let on_cols = get_join_columns(&vq, &solution_mappings.rdf_node_types); + + if self.prepper.pushdown_settings.contains(&PushdownSetting::Ordering) { + vq = vq.add_sorting_pushdown(&on_cols); + } let EagerSolutionMappings { mappings, mut rdf_node_types, diff --git a/lib/chrontext/src/preparing.rs b/lib/chrontext/src/preparing.rs index d3cace6..8185f89 100644 --- a/lib/chrontext/src/preparing.rs +++ b/lib/chrontext/src/preparing.rs @@ -15,7 +15,7 @@ use virtualized_query::{BasicVirtualizedQuery, VirtualizedQuery}; #[derive(Debug)] pub struct TimeseriesQueryPrepper { - pushdown_settings: HashSet, + pub(crate) pushdown_settings: HashSet, pub(crate) basic_virtualized_queries: Vec, grouping_counter: u16, rewritten_filters: HashMap, diff --git a/lib/virtualization/Cargo.toml b/lib/virtualization/Cargo.toml index 51e9801..d4d02c6 100644 --- a/lib/virtualization/Cargo.toml +++ b/lib/virtualization/Cargo.toml @@ -17,7 +17,6 @@ polars = {workspace=true, features=[ "nightly", "performant"] } opcua.workspace= true -log.workspace = true oxrdf.workspace = true query_processing.workspace = true spargebra.workspace=true diff --git a/lib/virtualization/src/bigquery.rs b/lib/virtualization/src/bigquery.rs index 7f5245b..e129535 100644 --- a/lib/virtualization/src/bigquery.rs +++ b/lib/virtualization/src/bigquery.rs @@ -159,7 +159,7 @@ fn rename_non_alpha_vars(vq: VirtualizedQuery, rename_map: &mut HashMap { + VirtualizedQuery::Grouped(GroupedVirtualizedQuery{ context, vq, by, aggregations }) => { let new_vq = rename_non_alpha_vars(*vq, rename_map); let mut new_by = vec![]; for v in by { diff --git a/lib/virtualization/src/opcua.rs b/lib/virtualization/src/opcua.rs index 1c3c469..cb2362e 100644 --- a/lib/virtualization/src/opcua.rs +++ b/lib/virtualization/src/opcua.rs @@ -92,7 +92,7 @@ impl VirtualizedOPCUADatabase { let mut colnames_identifiers = vec![]; let mut grouping_col_lookup = HashMap::new(); let grouping_columns = vq.get_groupby_columns(); - let mut grouping_col_name = if let Some(g) = grouping_columns.get(0) { + let grouping_col_name = if let Some(g) = grouping_columns.get(0) { Some(g.deref().clone()) } else { None diff --git a/lib/virtualization/src/python.rs b/lib/virtualization/src/python.rs index 9c4bf7a..4cbf69d 100644 --- a/lib/virtualization/src/python.rs +++ b/lib/virtualization/src/python.rs @@ -1,3 +1,5 @@ +mod sql_translation; + use polars::prelude::DataFrame; use pydf_io::to_rust::polars_df_to_rust_df; use pyo3::prelude::*; @@ -6,6 +8,7 @@ use std::collections::{HashSet}; use virtualized_query::pushdown_setting::{all_pushdowns, PushdownSetting}; use virtualized_query::python::PyVirtualizedQuery; use virtualized_query::VirtualizedQuery; +use sql_translation::PYTHON_CODE; #[derive(Clone, Debug)] #[pyclass] @@ -55,7 +58,7 @@ impl VirtualizedPythonDatabase { pub fn translate_sql(vq: &VirtualizedQuery, resource_sql_map: &Py, dialect:&str) -> PyResult { Python::with_gil(|py| { let pyvq = PyVirtualizedQuery::new(vq.clone(), py)?; - let db_mod = PyModule::import_bound(py, "my_db")?; + let db_mod = PyModule::from_code_bound(py, PYTHON_CODE, "my_translator", "my_translator")?; let translate_sql_func = db_mod.getattr("translate_sql")?; let query_string = translate_sql_func.call((pyvq, dialect, resource_sql_map), None)?; query_string.extract::() diff --git a/lib/virtualization/src/python/sql_translation.rs b/lib/virtualization/src/python/sql_translation.rs index e69de29..0df062b 100644 --- a/lib/virtualization/src/python/sql_translation.rs +++ b/lib/virtualization/src/python/sql_translation.rs @@ -0,0 +1,350 @@ +pub const PYTHON_CODE: &str = r#" +from datetime import datetime +from typing import Dict, Literal, Any, List, Union + +from sqlalchemy.dialects import postgresql +from sqlalchemy.sql.base import ColumnCollection +from sqlalchemy.sql.functions import GenericFunction +from sqlalchemy_bigquery.base import BigQueryDialect + +from chrontext import Expression, VirtualizedQuery, AggregateExpression +from sqlalchemy import ColumnElement, Column, Table, MetaData, Select, select, literal, DateTime, values, cast, \ + BigInteger, CompoundSelect, and_, literal_column, case, func, TIMESTAMP + +XSD = "http://www.w3.org/2001/XMLSchema#" +XSD_INTEGER = "" +FLOOR_DATE_TIME_TO_SECONDS_INTERVAL = "" + +class unnest(GenericFunction): + name = "UNNEST" + package = "bq" + inherit_cache = True + +class struct(GenericFunction): + name = "STRUCT" + package = "bq" + inherit_cache = True + +def translate_sql(vq: VirtualizedQuery, dialect: Literal["bigquery", "postgres"], + resource_sql_map: Dict[str, Any]) -> str: + mapper = SPARQLMapper(dialect, resource_sql_map) + q = mapper.virtualized_query_to_sql(vq) + match dialect: + case "bigquery": + use_dialect = BigQueryDialect() + case "postgres": + use_dialect = postgresql.dialect() + compiled = q.compile(dialect=use_dialect, compile_kwargs={"literal_binds": True}) + print("\n") + print(compiled) + return str(compiled) + + +class SPARQLMapper: + def __init__(self, + dialect: Literal["bigquery", "postgres"], + resource_sql_map: Dict[str, Union[Table, CompoundSelect]]): + self.dialect = dialect + self.resource_sql_map = resource_sql_map + self.counter = 0 + + def virtualized_query_to_sql(self, query: VirtualizedQuery) -> Select: + query_type = query.type_name() + + match query_type: + case "Filtered": + sql_quer = self.virtualized_query_to_sql(query.query) + filter_expr = self.expression_to_sql(query.filter, sql_quer.selected_columns) + filtered = sql_quer.filter(filter_expr) + return filtered + + case "Basic": + table = self.resource_sql_map[query.resource] + table = table.subquery(self.inner_name()) + + to_select = [] + to_select.append( + table.columns["id"].label(query.identifier_name) + ) + for (k, v) in query.column_mapping.items(): + to_select.append(table.columns[k].label(v)) + if query.grouping_column_name is not None: + if self.dialect == "bigquery": + structs = [] + for (id, group) in query.id_grouping_tuples: + structs.append(f"STRUCT('{id}' as id, {group} as {query.grouping_column_name})") + values_sub = func.bq.unnest(literal_column(f"[{', '.join(structs)}]")).table_valued( + Column("id"), + Column(query.grouping_column_name) + ) + table = values_sub.join( + table, + onclause=and_( + values_sub.columns["id"] == table.columns["id"], + table.columns["id"].in_(query.ids) + ) + ) + to_select.append( + cast( + values_sub.columns[query.grouping_column_name], BigInteger + ).label( + query.grouping_column_name + ) + ) + + sql_q = select( + *to_select + ).select_from( + table + ) + if self.dialect == "postgres": + values_sub = values( + Column("id"), Column(query.grouping_column_name), + name=self.inner_name() + ).data( + query.id_grouping_tuples + ) + table = values_sub.join( + table, + onclause=and_( + values_sub.columns["id"] == table.columns["id"], + table.columns["id"].in_(query.ids) + ) + ) + to_select.append( + cast( + values_sub.columns[query.grouping_column_name], BigInteger + ).label( + query.grouping_column_name + ) + ) + sql_q = select( + *to_select + ).select_from( + table + ) + else: + sql_q = select( + *to_select + ).where( + table.columns["id"].in_(query.ids) + ) + return sql_q + + case "Grouped": + sql_quer = self.virtualized_query_to_sql(query.query) + by = [sql_quer.columns[c.name].label(c.name) for c in query.by] + selection = by.copy() + for (v, agg) in query.aggregations: + selection.append( + self.aggregation_expression_to_sql(agg, sql_quer.columns).label(v.name) + ) + sql_quer = select( + sql_quer.subquery("inner") + ).with_only_columns( + *selection + ).group_by( + *by + ) + return sql_quer + + case "ExpressionAs": + sql_quer = self.virtualized_query_to_sql(query.query) + sql_expression = self.expression_to_sql(query.expression, sql_quer.selected_columns) + sql_quer = sql_quer.add_columns(sql_expression.label(query.variable.name)) + return sql_quer + case "InnerJoin": + sql_queries = [] + for q in query.queries: + sql_quer = self.virtualized_query_to_sql(q) + sql_queries.append(sql_quer) + cols_keys = set() + cols = [] + out_sql_quer = sql_queries.pop().subquery(self.inner_name()) + for c in out_sql_quer.columns.keys(): + cols_keys.add(c) + cols.append(out_sql_quer.columns[c].label(c)) + for sql_quer in sql_queries: + on = None + sql_quer = sql_quer.subquery(self.inner_name()) + for c in sql_quer.columns.keys(): + if c not in cols_keys: + cols_keys.add(c) + cols.append(sql_quer.columns[c].label(c)) + if c in out_sql_quer.columns: + new_on = out_sql_quer.columns[c] == sql_quer.columns[c] + if on is not None: + on = on & new_on + else: + on = new_on + + out_sql_quer = out_sql_quer.join(sql_quer, onclause=on) + + out_sql_quer = select(*cols).select_from(out_sql_quer) + return out_sql_quer + case "Ordered": + sql_quer = self.virtualized_query_to_sql(query.query) + for o in query.ordering: + sql_expr = self.expression_to_sql(o.expression, sql_quer.selected_columns) + if o.ascending: + sql_expr_order = sql_expr.asc() + else: + sql_expr_order = sql_expr.desc() + sql_quer = sql_quer.order_by(sql_expr_order) + return sql_quer + + def aggregation_expression_to_sql( + self, + aggregate_expression: AggregateExpression, + columns: ColumnCollection[str, ColumnElement], + ) -> ColumnElement: + sql_expression = self.expression_to_sql(aggregate_expression.expression, columns) + match aggregate_expression.name: + case "MIN": + return func.min(sql_expression) + case "MAX": + return func.max(sql_expression) + case "AVG": + return func.avg(sql_expression) + case "SUM": + return func.sum(sql_expression) + case "GROUP_CONCAT": + if aggregate_expression.separator is not None: + return func.aggregate_strings(sql_expression, + separator=literal_column(f"'{aggregate_expression.separator}'")) + else: + return func.aggregate_strings(sql_expression, separator=literal_column("''")) + case _: + print(aggregate_expression.name) + + def expression_to_sql( + self, + expression: Expression, + columns: ColumnCollection[str, ColumnElement] + ) -> Column | ColumnElement | int | float | bool | str: + expression_type = expression.expression_type() + match expression_type: + case "Variable": + return columns[expression.variable.name] + case "Bound": + return columns[expression.variable.name] != None + case "Greater": + left_sql = self.expression_to_sql(expression.left, columns) + right_sql = self.expression_to_sql(expression.right, columns) + return left_sql > right_sql + case "Less": + left_sql = self.expression_to_sql(expression.left, columns) + right_sql = self.expression_to_sql(expression.right, columns) + return left_sql < right_sql + case "GreaterOrEqual": + left_sql = self.expression_to_sql(expression.left, columns) + right_sql = self.expression_to_sql(expression.right, columns) + return left_sql >= right_sql + case "LessOrEqual": + left_sql = self.expression_to_sql(expression.left, columns) + right_sql = self.expression_to_sql(expression.right, columns) + return left_sql <= right_sql + case "Equal": + left_sql = self.expression_to_sql(expression.left, columns) + right_sql = self.expression_to_sql(expression.right, columns) + return left_sql == right_sql + case "And": + left_sql = self.expression_to_sql(expression.left, columns) + right_sql = self.expression_to_sql(expression.right, columns) + return left_sql & right_sql + case "Or": + left_sql = self.expression_to_sql(expression.left, columns) + right_sql = self.expression_to_sql(expression.right, columns) + return left_sql | right_sql + case "If": + left_sql = self.expression_to_sql(expression.left, columns) + middle_sql = self.expression_to_sql(expression.middle, columns) + right_sql = self.expression_to_sql(expression.right, columns) + return case(left_sql, middle_sql, else_=right_sql) + case "Not": + expression_sql = self.expression_to_sql(expression.expression, columns) + return ~expression_sql + case "Multiply": + left_sql = self.expression_to_sql(expression.left, columns) + right_sql = self.expression_to_sql(expression.right, columns) + return left_sql * right_sql + case "Divide": + left_sql = self.expression_to_sql(expression.left, columns) + right_sql = self.expression_to_sql(expression.right, columns) + return left_sql / right_sql + case "Add": + left_sql = self.expression_to_sql(expression.left, columns) + right_sql = self.expression_to_sql(expression.right, columns) + return left_sql + right_sql + case "Subtract": + left_sql = self.expression_to_sql(expression.left, columns) + right_sql = self.expression_to_sql(expression.right, columns) + return left_sql - right_sql + case "Literal": + native = expression.literal.to_native() + if type(native) == datetime: + if native.tzinfo is not None: + return literal(native, TIMESTAMP) + return literal(native) + case "FunctionCall": + sql_args = [] + for a in expression.arguments: + sql_args.append(self.expression_to_sql(a, columns)) + return self.function_call_to_sql(expression.function, sql_args) + case "In": + sql_collection = [] + for e in expression.expressions: + sql_collection.append(self.expression_to_sql(e, columns)) + sql_expression = self.expression_to_sql(expression.expression, columns) + return sql_expression.in_(sql_collection) + case "Coalesce": + sql_collection = [] + for e in expression.expressions: + sql_collection.append(self.expression_to_sql(e, columns)) + return func.coalesce(sql_collection) + case _: + print(type(expression)) + print(expression) + + def function_call_to_sql(self, + function: str, + sql_args: List[Column | ColumnElement | int | float | bool | str]) -> ColumnElement: + match function: + case "SECONDS": + return func.extract("SECOND", sql_args[0]) + case "MINUTES": + return func.extract("MINUTE", sql_args[0]) + case "HOURS": + return func.extract("HOUR", sql_args[0]) + case "DAY": + return func.extract("DAY", sql_args[0]) + case "MONTH": + return func.extract("MONTH", sql_args[0]) + case "YEAR": + return func.extract("YEAR", sql_args[0]) + case "FLOOR": + return func.floor(sql_args[0]) + case "CEILING": + return func.ceiling(sql_args[0]) + case IRI: + if IRI == XSD_INTEGER: + return func.cast(sql_args[0], BigInteger) + elif IRI == FLOOR_DATE_TIME_TO_SECONDS_INTERVAL: + if self.dialect == "postgres": + return func.to_timestamp( + func.extract("EPOCH", sql_args[0]) - func.mod( + func.extract("EPOCH", sql_args[0]), + sql_args[1]) + ) + elif self.dialect == "bigquery": + return func.TIMESTAMP_SECONDS( + func.UNIX_SECONDS(sql_args[0]) - func.mod( + func.UNIX_SECONDS(sql_args[0]), + sql_args[1]) + ) + + def inner_name(self) -> str: + name = f"inner_{self.counter}" + self.counter += 1 + return name +"#; \ No newline at end of file diff --git a/lib/virtualized_query/Cargo.toml b/lib/virtualized_query/Cargo.toml index 29d5f88..6fa9f6d 100644 --- a/lib/virtualized_query/Cargo.toml +++ b/lib/virtualized_query/Cargo.toml @@ -16,7 +16,6 @@ polars = {workspace=true, features=[ spargebra.workspace = true serde.workspace = true pyo3 = {workspace = true } -thiserror.workspace = true [features] rdf-star = ["representation/rdf-star"] \ No newline at end of file diff --git a/lib/virtualized_query/src/lib.rs b/lib/virtualized_query/src/lib.rs index 3e6326b..2a9fb6c 100644 --- a/lib/virtualized_query/src/lib.rs +++ b/lib/virtualized_query/src/lib.rs @@ -10,8 +10,9 @@ use spargebra::term::{NamedNodePattern, TermPattern, TriplePattern, Variable}; use std::collections::HashSet; use std::error::Error; use std::fmt::{Display, Formatter}; -use representation::RDFNodeType; +use spargebra::remove_sugar::{HAS_TIMESTAMP, HAS_VALUE}; use templates::ast::{ConstantTerm, ConstantTermOrList, StottrTerm, Template}; +use templates::constants::OTTR_TRIPLE; pub const ID_VARIABLE_NAME: &str = "id"; @@ -165,6 +166,33 @@ impl BasicVirtualizedQuery { } } self.column_mapping.extend(new_mappings); + + //Add hard coded stuff .. + for t in &template.pattern_list { + if t.template_name.as_str() == OTTR_TRIPLE { + if let Some(verb) = t.argument_list.get(1) { + if let StottrTerm::ConstantTerm(ConstantTermOrList::ConstantTerm(ConstantTerm::Iri(v))) = &verb.term { + if v == HAS_TIMESTAMP { + if let Some(obj) = t.argument_list.get(2) { + if let StottrTerm::Variable(v) = &obj.term { + if let Some(TermPattern::Variable(v)) = self.column_mapping.get(v) { + self.chrontext_timestamp_variable = Some(v.clone()); + } + } + } + } else if v == HAS_VALUE { + if let Some(obj) = t.argument_list.get(2) { + if let StottrTerm::Variable(v) = &obj.term { + if let Some(TermPattern::Variable(v)) = self.column_mapping.get(v) { + self.chrontext_value_variable = Some(v.clone()); + } + } + } + } + } + } + } + } } } @@ -304,15 +332,6 @@ impl VirtualizedQuery { VirtualizedQuery::ExpressionAs(t, ..) => t.expected_columns(), } } - // - // pub fn has_equivalent_value_variable(&self, variable: &Variable, context: &Context) -> bool { - // for value_variable in self.get_value_variables() { - // if value_variable.equivalent(variable, context) { - // return true; - // } - // } - // false - // } pub fn get_ids(&self) -> Vec<&String> { match self { diff --git a/lib/virtualized_query/src/pushdown_setting.rs b/lib/virtualized_query/src/pushdown_setting.rs index 76d7ff0..6b82ca5 100644 --- a/lib/virtualized_query/src/pushdown_setting.rs +++ b/lib/virtualized_query/src/pushdown_setting.rs @@ -2,11 +2,12 @@ use serde::{Deserialize, Serialize}; use std::collections::HashSet; pub fn all_pushdowns() -> HashSet { - [PushdownSetting::GroupBy, PushdownSetting::ValueConditions].into() + [PushdownSetting::GroupBy, PushdownSetting::ValueConditions, PushdownSetting::Ordering].into() } #[derive(Hash, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] pub enum PushdownSetting { ValueConditions, GroupBy, + Ordering, } diff --git a/lib/virtualized_query/src/python.rs b/lib/virtualized_query/src/python.rs index 2c61e9f..a5dc24b 100644 --- a/lib/virtualized_query/src/python.rs +++ b/lib/virtualized_query/src/python.rs @@ -1,10 +1,10 @@ -use crate::{BasicVirtualizedQuery, Synchronizer, VirtualizedQuery}; +use crate::{VirtualizedQuery}; use polars::export::ahash::{HashMap, HashMapExt}; use polars::prelude::AnyValue; use pyo3::prelude::*; use representation::python::{PyIRI, PyLiteral, PyVariable}; use spargebra::algebra::{AggregateExpression, AggregateFunction, Expression, OrderExpression}; -use spargebra::term::{TermPattern, Variable}; +use spargebra::term::{TermPattern}; // // #[derive(Error, Debug)] // pub enum PyExpressionError { diff --git a/py_chrontext/Cargo.toml b/py_chrontext/Cargo.toml index 24653b4..c8ab937 100644 --- a/py_chrontext/Cargo.toml +++ b/py_chrontext/Cargo.toml @@ -14,9 +14,7 @@ templates.workspace = true pydf_io.workspace = true pyo3 = { workspace=true, features = ["extension-module"] } -async-trait.workspace = true thiserror.workspace = true -polars = { workspace = true, features = ["performant", "cse", "nightly", "lazy"] } oxrdf.workspace = true tokio.workspace = true log.workspace = true diff --git a/py_chrontext/tests/test_opcua.py b/py_chrontext/tests/test_opcua.py index f45c1f4..a7c82c1 100644 --- a/py_chrontext/tests/test_opcua.py +++ b/py_chrontext/tests/test_opcua.py @@ -3,6 +3,8 @@ import pathlib import time from multiprocessing import Process +from typing import Dict + import polars as pl from polars.testing import assert_frame_equal @@ -87,11 +89,36 @@ def oxigraph_testdata(oxigraph_db): res = ep.query() #print(res) -def test_simplified_opcua_case(opcua_server, oxigraph_testdata): +@pytest.fixture(scope="module") +def resources() -> Dict[str, Template]: + ct = Prefix("ct", "https://github.com/DataTreehouse/chrontext#") + x = xsd() + id = Variable("id") + timestamp = Variable("timestamp") + value = Variable("value") + dp = Variable("dp") + resources = { + "my_resource": Template( + iri=ct.suf("my_resource"), + parameters=[ + Parameter(id, rdf_type=RDFType.Literal(x.string)), + Parameter(timestamp, rdf_type=RDFType.Literal(x.dateTime)), + Parameter(value, rdf_type=RDFType.Literal(x.double)), + ], + instances=[ + triple(id, ct.suf("hasDataPoint"), dp), + triple(dp, ct.suf("hasValue"), value), + triple(dp, ct.suf("hasTimestamp"), timestamp) + ] + ) + } + return resources + +def test_simplified_opcua_case(opcua_server, oxigraph_testdata, resources): print("Begin test") opcua_db = VirtualizedOPCUADatabase(namespace=2, endpoint=OPCUA_ENDPOINT) print("created opcua backend") - engine = Engine(sparql_endpoint=OXIGRAPH_QUERY_ENDPOINT, virtualized_opcua_database=opcua_db) + engine = Engine(resources=resources, sparql_endpoint=OXIGRAPH_QUERY_ENDPOINT, virtualized_opcua_database=opcua_db) print("defined engine") df = engine.query(""" PREFIX xsd: diff --git a/py_chrontext/tests/testdata/python_based/testdata.nt b/py_chrontext/tests/testdata/python_based/testdata.nt index 48a6527..bc9109c 100644 --- a/py_chrontext/tests/testdata/python_based/testdata.nt +++ b/py_chrontext/tests/testdata/python_based/testdata.nt @@ -1,11 +1,11 @@ - . - . - . - . "my_resource" . - . + . . + . + . "ts2" . + . "ts1" . - . + . "my_resource" . + .