chrontext: High-performance hybrid query engine for knowledge graphs and analytical data (e.g. time-series)
Chrontext allows you to use your knowledge graph to access large amounts of time-series or other analytical data. It uses a commodity SPARQL Triplestore and your existing data storage infrastructure. It currently supports time-series stored in a PostgreSQL-compatible Database such as DuckDB, Google Cloud BigQuery (SQL) and OPC UA HA, but can easily be extended to other APIs and databases.
Chrontext forms a semantic layer that allows self-service data access, abstracting away technical infrastructure. Users can create query-based inputs for data products, that maintains these data products as the knowledge graph is maintained, and that can be deployed across heterogeneous on-premise and cloud infrastructures with the same API.
Chrontext is a high-performance Python library built in Rust using Polars, and relies heavily on packages from the Oxigraph project. Chrontext works with Apache Arrow, prefers time-series transport using Apache Arrow Flight and delivers results as Polars DataFrames.
Please reach out to Data Treehouse if you would like help trying Chrontext, or require support for a different database backend.
Chrontext is in pip, just use:
pip install chrontext
The API is documented HERE.
The code assumes that we have a SPARQL-endpoint and BigQuery set up with time-series.
...
q = """
PREFIX xsd:<http://www.w3.org/2001/XMLSchema#>
PREFIX ct:<https://github.com/DataTreehouse/chrontext#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX rds: <https://github.com/DataTreehouse/solar_demo/rds_power#>
SELECT ?path ?t ?ts_pow_value ?ts_irr_value
WHERE {
?site a rds:Site;
rdfs:label "Jonathanland";
rds:functionalAspect ?block.
# At the Block level there is an irradiation measurement:
?block a rds:A;
ct:hasTimeseries ?ts_irr.
?ts_irr rdfs:label "RefCell1_Wm2".
# At the Inverter level, there is a Power measurement
?block rds:functionalAspect+ ?inv.
?inv a rds:TBB;
rds:path ?path;
ct:hasTimeseries ?ts_pow.
?ts_pow rdfs:label "InvPDC_kW".
?ts_pow ct:hasDataPoint ?ts_pow_datapoint.
?ts_pow_datapoint ct:hasValue ?ts_pow_value;
ct:hasTimestamp ?t.
?ts_irr ct:hasDataPoint ?ts_irr_datapoint.
?ts_irr_datapoint ct:hasValue ?ts_irr_value;
ct:hasTimestamp ?t.
FILTER(
?t >= "2018-08-24T12:00:00+00:00"^^xsd:dateTime &&
?t <= "2018-08-24T13:00:00+00:00"^^xsd:dateTime)
} ORDER BY ?path ?t
"""
df = engine.query(q)
This produces the following DataFrame:
path | t | ts_pow_value | ts_irr_value |
---|---|---|---|
str | datetime[ns, UTC] | f64 | f64 |
=.A1.RG1.TBB1 | 2018-08-24 12:00:00 UTC | 39.74 | 184.0 |
=.A1.RG1.TBB1 | 2018-08-24 12:00:01 UTC | 39.57 | 184.0 |
=.A1.RG1.TBB1 | 2018-08-24 12:00:02 UTC | 40.1 | 184.0 |
=.A1.RG1.TBB1 | 2018-08-24 12:00:03 UTC | 40.05 | 184.0 |
=.A1.RG1.TBB1 | 2018-08-24 12:00:04 UTC | 40.02 | 184.0 |
… | … | … | … |
=.A5.RG9.TBB1 | 2018-08-24 12:59:56 UTC | 105.5 | 427.5 |
=.A5.RG9.TBB1 | 2018-08-24 12:59:57 UTC | 104.9 | 427.6 |
=.A5.RG9.TBB1 | 2018-08-24 12:59:58 UTC | 105.6 | 428.0 |
=.A5.RG9.TBB1 | 2018-08-24 12:59:59 UTC | 105.9 | 428.0 |
=.A5.RG9.TBB1 | 2018-08-24 13:00:00 UTC | 105.7 | 428.5 |
The API is documented HERE.
In the following tutorial, we assume that you have a couple of CSV-files on disk that you want to query.
We assume that you have DuckDB and chrontext installed, if not, do pip install chrontext duckdb
.
Installing chrontext will also install sqlalchemy, which we rely on to define the virtualized DuckDB tables.
Our csv files look like this.
ts1.csv :
timestamp,value
2022-06-01T08:46:52,1
2022-06-01T08:46:53,10
..
2022-06-01T08:46:59,105
ts2.csv:
timestamp,value
2022-06-01T08:46:52,2
2022-06-01T08:46:53,20
...
2022-06-01T08:46:59,206
We need to create a class with a method query
that takes a SQL string its argument, returning a Polars DataFrame.
In this class, we just hard code the DuckDB setup in the constructor.
import duckdb
import polars as pl
class MyDuckDB():
def __init__(self):
con = duckdb.connect()
con.execute("SET TIME ZONE 'UTC';")
con.execute("""CREATE TABLE ts1 ("timestamp" TIMESTAMPTZ, "value" INTEGER)""")
ts_1 = pl.read_csv("ts1.csv", try_parse_dates=True).with_columns(pl.col("timestamp").dt.replace_time_zone("UTC"))
con.append("ts1", df=ts_1.to_pandas())
con.execute("""CREATE TABLE ts2 ("timestamp" TIMESTAMPTZ, "value" INTEGER)""")
ts_2 = pl.read_csv("ts2.csv", try_parse_dates=True).with_columns(pl.col("timestamp").dt.replace_time_zone("UTC"))
con.append("ts2", df=ts_2.to_pandas())
self.con = con
def query(self, sql:str) -> pl.DataFrame:
# We execute the query and return it as a Polars DataFrame.
# Chrontext expects this method to exist in the provided class.
df = self.con.execute(sql).pl()
return df
my_db = MyDuckDB()
We first define a sqlalchemy select query involving the two tables. It is crucial that we have a column labelled "id" here. Chrontext will modify this query when executing hybrid queries.
from sqlalchemy import MetaData, Table, Column, bindparam
metadata = MetaData()
ts1_table = Table(
"ts1",
metadata,
Column("timestamp"),
Column("value")
)
ts2_table = Table(
"ts2",
metadata,
Column("timestamp"),
Column("value")
)
ts1 = ts1_table.select().add_columns(
bindparam("id1", "ts1").label("id"),
)
ts2 = ts2_table.select().add_columns(
bindparam("id2", "ts2").label("id"),
)
sql = ts1.union(ts2)
Now, we are ready to define the virtualized backend. We will annotate nodes of the graph with a resource data property.
These data properties will be linked to virtualized RDF triples in the DuckDB backend. The resource_sql_map
decides which SQL is used for each resource property.
from chrontext import VirtualizedPythonDatabase
vdb = VirtualizedPythonDatabase(
database=my_db,
resource_sql_map={"my_resource": sql},
sql_dialect="postgres"
)
The triple below will link the ex:myWidget1
to triples defined by the above sql.
ex:myWidget1 ct:hasResource "my_resource" .
However, it will only be linked to those triples corresponding to rows where the identifier column equals the identifier associated with ex:myWidget1
.
Below, we define that ex:instanceA is only linked to those rows where the id
column is ts1
.
ex:myWidget1 ct:hasIdentifier "ts1" .
In any such resource sql, the id
column is mandatory.
Next, we want to relate the rows in this sql, each containing id, timestamp, value to RDF triples, using a template. It is crucial to have the column id.
from chrontext import Prefix, Variable, Template, Parameter, RDFType, Triple, XSD
ct = Prefix("ct", "https://github.com/DataTreehouse/chrontext#")
xsd = XSD()
id = Variable("id")
timestamp = Variable("timestamp")
value = Variable("value")
dp = Variable("dp")
resources = {
"my_resource": Template(
iri=ct.suf("my_resource"),
parameters=[
Parameter(id, rdf_type=RDFType.Literal(xsd.string)),
Parameter(timestamp, rdf_type=RDFType.Literal(xsd.dateTime)),
Parameter(value, rdf_type=RDFType.Literal(xsd.double)),
],
instances=[
Triple(id, ct.suf("hasDataPoint"), dp),
Triple(dp, ct.suf("hasValue"), value),
Triple(dp, ct.suf("hasTimestamp"), timestamp)
]
)}
This means that our instance ex:myWidget1
, will be associated with a value and a timestamp (and a blank data point) for each row in ts1.csv
.
For instance, the first row means we have:
ex:widget1 ct:hasDataPoint _:b1 .
_:b1 ct:hasTimestamp "2022-06-01T08:46:52Z"^^xsd:dateTime .
_:b1 ct:hasValue 1 .
Chrontext is created for those cases when this is infeasibly many triples, so we do not want to materialize them, but query them.
The context for our analytical data (e.g. a model of an industrial asset) has to be stored in a SPARQL endpoint. In this case, we use the embedded Oxigraph engine from pyoxigraph. Now we assemble the pieces and create the engine.
from chrontext import Engine
from pyoxigraph import Store
oxigraph_store = Store()
oxigraph_store.bulk_load(path="my_graph.ttl")
engine = Engine(
resources,
virtualized_python_database=vdb,
sparql_embedded_oxigraph=oxigraph_store)
engine.init()
Now we can use our context to query the dataset. The aggregation below are pushed into DuckDB.
The example below is a bit simple, but complex conditions can identify the ?w
and ?s
.
q = """
PREFIX xsd:<http://www.w3.org/2001/XMLSchema#>
PREFIX chrontext:<https://github.com/DataTreehouse/chrontext#>
PREFIX types:<http://example.org/types#>
SELECT ?w (SUM(?v) as ?sum_v) WHERE {
?w types:hasSensor ?s .
?s a types:ThingCounter .
?s chrontext:hasTimeseries ?ts .
?ts chrontext:hasDataPoint ?dp .
?dp chrontext:hasTimestamp ?t .
?dp chrontext:hasValue ?v .
FILTER(?t > "2022-06-01T08:46:53Z"^^xsd:dateTime) .
} GROUP BY ?w
"""
df = engine.query(q)
print(df)
This produces the following result:
w | sum_v |
---|---|
str | decimal[38,0] |
http://example.org/case#myWidget1 | 1215 |
http://example.org/case#myWidget2 | 1216 |
Let us know if you have suggestions!
Chrontext will be put into use in the energy industry during the period, and will be stabilized as part of this process. We are very interested in your bug reports!
We are likely adding support for ADX/KustoQL. Let us know if this is something that would be useful for you.
We are likely adding support for Databricks SQL as the virtualization backend.
While chrontext is currently focused on time series data, we are incrementally adding support for contextualization of arbitrary analytical data.
Currently, we only support one database backend at a given time. We plan to support hybrid queries across multiple virtualized databases.
Chrontext is joint work by Magnus Bakken and Professor Ahmet Soylu at OsloMet. To read more about Chrontext, read the article Chrontext: Portable Sparql Queries Over Contextualised Time Series Data in Industrial Settings.
All code produced since August 1st. 2023 is copyrighted to Data Treehouse AS with an Apache 2.0 license unless otherwise noted.
All code which was produced before August 1st. 2023 copyrighted to Prediktor AS with an Apache 2.0 license unless otherwise noted, and has been financed by The Research Council of Norway (grant no. 316656) and Prediktor AS as part of a PhD Degree. The code at this state is archived in the repository at https://github.com/DataTreehouse/chrontext.