Skip to content

Commit

Permalink
Begin databricks support
Browse files Browse the repository at this point in the history
  • Loading branch information
magbak committed Aug 26, 2024
1 parent 4a65832 commit c5858de
Show file tree
Hide file tree
Showing 17 changed files with 182 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ permissions:
env:
CARGO_TERM_COLOR: always
RUST_LOG: debug
MATURIN_VERSION: '1.7.0'
MATURIN_VERSION: '1.7.1'
RUST_TOOLCHAIN: nightly-2024-06-23

jobs:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/python_query_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
env:
CARGO_TERM_COLOR: always
RUST_LOG: debug
MATURIN_VERSION: '1.7.0'
MATURIN_VERSION: '1.7.1'
RUST_TOOLCHAIN: nightly-2024-06-23

jobs:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/python_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ permissions:
env:
CARGO_TERM_COLOR: always
RUST_TOOLCHAIN: nightly-2024-06-23
MATURIN_VERSION: '1.7.0'
MATURIN_VERSION: '1.7.1'
MATURIN_PYPI_TOKEN: ${{ secrets.PYPI_API_TOKEN }}

jobs:
Expand Down
40 changes: 35 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ members = [
#pydf_io = { path = "../maplib/lib/pydf_io"}
#representation = { path = "../maplib/lib/representation", features = ["rdf-star"]}
#templates = { path = "../maplib/lib/templates"}
spargebra = { git = "https://github.com/DataTreehouse/maplib", rev="3aaabe8bd2326fbf204456f961f972d927afad78", features = ["rdf-star"]}
query_processing = { git = "https://github.com/DataTreehouse/maplib", rev="3aaabe8bd2326fbf204456f961f972d927afad78" }
pydf_io = { git = "https://github.com/DataTreehouse/maplib", rev="3aaabe8bd2326fbf204456f961f972d927afad78" }
representation = { git = "https://github.com/DataTreehouse/maplib", rev="3aaabe8bd2326fbf204456f961f972d927afad78", features = ["rdf-star"] }
templates = { git = "https://github.com/DataTreehouse/maplib", rev="3aaabe8bd2326fbf204456f961f972d927afad78" }
spargebra = { git = "https://github.com/DataTreehouse/maplib", rev="3bf75ac20a71c9afeab07a7b4e0196fe51e43c61", features = ["rdf-star"]}
query_processing = { git = "https://github.com/DataTreehouse/maplib", rev="3bf75ac20a71c9afeab07a7b4e0196fe51e43c61" }
pydf_io = { git = "https://github.com/DataTreehouse/maplib", rev="3bf75ac20a71c9afeab07a7b4e0196fe51e43c61" }
representation = { git = "https://github.com/DataTreehouse/maplib", rev="3bf75ac20a71c9afeab07a7b4e0196fe51e43c61", features = ["rdf-star"] }
templates = { git = "https://github.com/DataTreehouse/maplib", rev="3bf75ac20a71c9afeab07a7b4e0196fe51e43c61" }


sparesults = { version = "0.2.0-alpha.5", features = ["rdf-star"] }
Expand All @@ -44,6 +44,7 @@ gcp-bigquery-client = "0.20.0"
rayon = "1.10.0"
opcua = {version="0.12.0", features = ["vendored-openssl"]}
url = "2.5.2"
uuid = {version = "1.10.0", features = ["fast-rng", "v4"]}

[patch.crates-io]
oxrdf = { git = 'https://github.com/magbak/oxigraph.git', rev = "b13df973ed2785de2ac41066ca4b62d88d3f5d40"}
Expand Down
1 change: 1 addition & 0 deletions lib/chrontext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ async-recursion.workspace = true
async-trait.workspace = true
oxigraph.workspace = true
filesize.workspace = true
uuid.workspace = true
11 changes: 10 additions & 1 deletion lib/chrontext/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use virtualization::{Virtualization, VirtualizedDatabase};
use virtualized_query::pushdown_setting::PushdownSetting;
use crate::rename_vars::rename_vars;

pub struct EngineConfig {
pub sparql_endpoint: Option<String>,
Expand Down Expand Up @@ -83,6 +84,7 @@ impl Engine {
let parsed_query = parse_sparql_select_query(query)?;
debug!("Parsed query: {}", parsed_query.to_string());
debug!("Parsed query algebra: {:?}", &parsed_query);
let (parsed_query, rename_map) = rename_vars(parsed_query);
let virtualized_iris = self.virtualization.get_virtualized_iris();
let first_level_virtualized_iris = self.virtualization.get_first_level_virtualized_iris();

Expand Down Expand Up @@ -112,10 +114,17 @@ impl Engine {
rewritten_filters,
self.virtualization.clone(),
);
let solution_mappings = combiner
let mut solution_mappings = combiner
.combine_static_and_time_series_results(static_queries_map, &preprocessed_query)
.await
.map_err(|x| ChrontextError::CombinerError(x))?;
for (original, renamed) in rename_map {
if let Some(dt) = solution_mappings.rdf_node_types.remove(&renamed) {
solution_mappings.mappings = solution_mappings.mappings.rename(&[renamed], &[original.clone()]);
solution_mappings.rdf_node_types.insert(original, dt);
}
}

let SolutionMappings {
mappings,
rdf_node_types,
Expand Down
1 change: 1 addition & 0 deletions lib/chrontext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ pub mod rewriting;
pub mod sparql_database;
mod sparql_result_to_polars;
pub mod splitter;
mod rename_vars;
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub(crate) fn rewrite_filter_expression(
context,
pushdown_settings,
);
return (rewrite.expression.take(), rewrite.lost_value);
(rewrite.expression.take(), rewrite.lost_value)
}

pub(crate) fn try_recursive_rewrite_expression(
Expand Down
5 changes: 2 additions & 3 deletions lib/virtualization/src/opcua.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use oxrdf::vocab::xsd;
use oxrdf::{Literal, Variable};
use polars::export::chrono::{DateTime as ChronoDateTime, Duration, TimeZone, Utc};
use polars::prelude::{
concat, AnyValue, DataFrame, DataType, IntoLazy, IntoVec, NamedFrom, Series, UnionArgs,
concat, AnyValue, DataFrame, DataType, IntoLazy, NamedFrom, Series, UnionArgs,
};
use query_processing::constants::DATETIME_AS_SECONDS;
use representation::query_context::Context;
Expand Down Expand Up @@ -93,8 +93,7 @@ impl VirtualizedOPCUADatabase {
let mut grouping_col_lookup = HashMap::new();
let grouping_columns = vq.get_groupby_columns();
let grouping_col_name = if let Some(g) = grouping_columns.into_iter().next() {
#[allow(suspicious_double_ref_op)]
Some(g.deref().clone())
Some(g.deref())
} else {
None
};
Expand Down
11 changes: 10 additions & 1 deletion lib/virtualization/src/python/sql_translation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ from sqlalchemy.dialects import postgresql
from sqlalchemy.sql.base import ColumnCollection
from sqlalchemy.sql.functions import GenericFunction
from sqlalchemy_bigquery.base import BigQueryDialect
from databricks.sqlalchemy import DatabricksDialect
from chrontext.vq import Expression, VirtualizedQuery, AggregateExpression
from sqlalchemy import ColumnElement, Column, Table, MetaData, Select, select, literal, DateTime, values, cast, \
Expand Down Expand Up @@ -34,6 +35,8 @@ def translate_sql(vq: VirtualizedQuery, dialect: Literal["bigquery", "postgres"]
use_dialect = BigQueryDialect()
case "postgres":
use_dialect = postgresql.dialect()
case "databricks":
use_dialect = DatabricksDialect()
compiled = q.compile(dialect=use_dialect, compile_kwargs={"literal_binds": True})
return str(compiled)
Expand Down Expand Up @@ -97,7 +100,7 @@ class SPARQLMapper:
).select_from(
table
)
if self.dialect == "postgres":
if self.dialect == "postgres" or self.dialect == "databricks":
values_sub = values(
Column("id"), Column(query.grouping_column_name),
name=self.inner_name()
Expand Down Expand Up @@ -349,6 +352,12 @@ class SPARQLMapper:
func.extract("EPOCH", sql_args[0]),
sql_args[1])
)
elif self.dialect == "databricks":
return func.TIMESTAMP_SECONDS(
func.UNIX_TIMESTAMP(sql_args[0]) - func.mod(
func.UNIX_TIMESTAMP(sql_args[0]),
sql_args[1])
)
elif self.dialect == "bigquery":
return func.TIMESTAMP_SECONDS(
func.UNIX_SECONDS(sql_args[0]) - func.mod(
Expand Down
2 changes: 1 addition & 1 deletion py_chrontext/chrontext/chrontext.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class VirtualizedPythonDatabase:
def __init__(self,
database: Any,
resource_sql_map: Optional[Dict[str, Any]],
sql_dialect: Optional[LiteralType["postgres", "bigquery"]]):
sql_dialect: Optional[LiteralType["postgres", "bigquery", "databricks"]]):
"""
See the tutorial in README.md for guidance on how to use this class.
This API is subject to change, it will be possible to specify what parts of the SPARQL query may be pushed down into the database.
Expand Down
8 changes: 4 additions & 4 deletions py_chrontext/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[project]
name = "chrontext"
description = "Hybrid SPARQL query engine for timeseries data"
dependencies = ["polars>=0.20.2", "pyarrow>=7.0.0", "pandas", "sqlalchemy>=2.0.31", "sqlalchemy_bigquery==1.11.0"]
dependencies = ["polars>=0.20.2", "pyarrow>=7.0.0", "pandas", "sqlalchemy>=2.0.31", "sqlalchemy_bigquery==1.11.0", "databricks-sql-connector>=3.3.0"]
readme = "README.md"
authors = [{name = "Magnus Bakken", email = "[email protected]" }]
license = {file = "LICENSE"}
authors = [{ name = "Magnus Bakken", email = "[email protected]" }]
license = { file = "LICENSE" }
requires-python = ">=3.9"
keywords = ["rdf", "graph", "arrow", "sparql", "timeseries"]
classifiers = [
Expand All @@ -26,5 +26,5 @@ Repository = "https://github.com/DataTreehouse/chrontext"
Changelog = "https://github.com/DataTreehouse/chrontext/releases"

[build-system]
requires = ["maturin==1.5.1"]
requires = ["maturin==1.7.1"]
build-backend = "maturin"
2 changes: 0 additions & 2 deletions py_chrontext/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use chrontext::combiner::CombinerError;
use chrontext::errors::ChrontextError as RustChrontextError;
use chrontext::splitter::QueryParseError;
use oxrdf::IriParseError;
use pyo3::{create_exception, exceptions::PyException, prelude::*};
use spargebra::SparqlSyntaxError;
Expand Down
1 change: 1 addition & 0 deletions py_chrontext/tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ requests>=2.32.0
sparqlwrapper==2.0.0
asyncua==1.0.4
duckdb>=1.0.0
pytest-mock==3.14.0
Loading

0 comments on commit c5858de

Please sign in to comment.