Skip to content

Commit

Permalink
fixes #133
Browse files Browse the repository at this point in the history
  • Loading branch information
WolfgangFahl committed Sep 13, 2024
1 parent e40aae6 commit 2261566
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 89 deletions.
24 changes: 19 additions & 5 deletions lodstorage/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,37 @@

import argparse
import re
from typing import Dict, Optional
from typing import Dict,Optional
from lodstorage.yamlable import lod_storable


@lod_storable
class Param:
"""
a parameter
"""
name:str
type:str
default_value: str


class Params:
"""
parameter handling
"""

def __init__(self, query: str, illegal_chars: str = """"[;<>&|]"'"""):
def __init__(self, query: str, illegal_chars: str = """"[;<>&|]"'""",with_audit:bool=True):
"""
constructor
Args:
query(str): the query to analyze for parameters
illegal_chars: chars that may not be in the values
query (str): the query to analyze for parameters
illegal_chars (str): chars that may not be in the values
with_audit (bool): if True audit parameters
"""
self.illegal_chars = illegal_chars
self.query = query
self.with_audit=with_audit
self.pattern = re.compile(r"{{\s*(\w+)\s*}}")
self.params = self.pattern.findall(query)
self.params_dict = {param: "" for param in self.params}
Expand Down Expand Up @@ -56,7 +69,8 @@ def apply_parameters(self) -> str:
Returns:
str: The query with Jinja templates replaced by parameter values.
"""
self.audit()
if self.with_audit:
self.audit()
query = self.query
for param, value in self.params_dict.items():
pattern = re.compile(r"{{\s*" + re.escape(param) + r"\s*\}\}")
Expand Down
134 changes: 68 additions & 66 deletions lodstorage/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import urllib
from enum import Enum
from pathlib import Path

from dataclasses import field
import yaml
from pygments import highlight
from pygments.formatters.html import HtmlFormatter
Expand All @@ -25,7 +25,9 @@
# original is at
from lodstorage.jsonable import JSONAble
from lodstorage.mwTable import MediaWikiTable

from lodstorage.yamlable import lod_storable
from typing import Dict,List,Optional
from lodstorage.params import Params, Param

class Format(Enum):
"""
Expand Down Expand Up @@ -282,55 +284,53 @@ def asText(self):
)
return fixedStr


class Query(object):
"""a Query e.g. for SPAQRL"""

def __init__(
self,
name: str,
query: str,
lang="sparql",
endpoint: str = None,
database: str = "blazegraph",
title: str = None,
description: str = None,
limit: int = None,
prefixes=None,
tryItUrl: str = None,
formats: list = None,
debug=False,
):
"""
constructor
Args:
name(string): the name/label of the query
query(string): the native Query text e.g. in SPARQL
lang(string): the language of the query e.g. SPARQL
endpoint(string): the endpoint url to use
database(string): the type of database e.g. "blazegraph"
title(string): the header/title of the query
description(string): the description of the query
limit(int): the limit of the query default: None
prefixes(list): list of prefixes to be resolved
tryItUrl(str): the url of a "tryit" webpage
formats(list): key,value pairs of ValueFormatters to be applied
debug(boolean): true if debug mode should be switched on
"""
self.name = name
self.query = query
self.lang = lang
self.endpoint = endpoint
self.database = database
self.tryItUrl = tryItUrl

self.title = title = name if title is None else title
self.description = "" if description is None else description
self.limit = limit
self.prefixes = prefixes
self.debug = debug
self.formats = formats
self.formatCallBacks = []
@lod_storable
class Query:
"""
A Query e.g. for SPARQL
Attributes:
name (str): the name/label of the query
query (str): the native Query text e.g. in SPARQL
lang (str): the language of the query e.g. SPARQL
sparql(str): SPARQL querycode
sql(str): SQL query code
ask(atr): SMW ASK query code
endpoint (str): the endpoint url to use
database (str): the type of database e.g. "blazegraph"
title (str): the header/title of the query
description (str): the description of the query
limit (int): the limit of the query
prefixes (list): list of prefixes to be resolved
tryItUrl (str): the url of a "tryit" webpage
formats (list): key,value pairs of ValueFormatters to be applied
debug (bool): true if debug mode should be switched on
"""
name: str
query: str
lang: str = "sparql"
sparql: Optional[str] = None
sql: Optional[str]=None
ask: Optional[str]=None
endpoint: Optional[str] = None
database: str = "blazegraph"
title: Optional[str] = None
description: str = ""
limit: Optional[int] = None
prefixes: Optional[List[str]] = None
tryItUrl: Optional[str] = None
formats: Optional[List] = None
debug: bool = False
formatCallBacks: List = field(default_factory=list)
param_list: List[Param] = field(default_factory=list)


def __post_init__(self):
if self.title is None:
self.title = self.name
self.params = Params(self.query)

def __str__(self):
queryStr = "\n".join(
Expand All @@ -342,6 +342,15 @@ def __str__(self):
)
return f"{queryStr}"

def apply_default_params(self):
"""
apply my default parameters
"""
for param in self.param_list:
value = param.default_value
self.params.params_dict[param.name] = value
self.params.apply_parameters()

def addFormatCallBack(self, callback):
self.formatCallBacks.append(callback)

Expand Down Expand Up @@ -598,7 +607,6 @@ def documentQueryResult(
)
return queryResultDocumentation


class QueryManager(object):
"""
manages pre packaged Queries
Expand All @@ -620,26 +628,20 @@ def __init__(
self.queriesByName = {}
self.lang = lang
self.debug = debug
queries = QueryManager.getQueries(
queries = self.getQueries(
queriesPath=queriesPath, with_default=with_default
)
for name, queryDict in queries.items():
if self.lang in queryDict:
queryText = queryDict.pop(self.lang)
for qformat in ["sparql", "sql", "ask"]: # drop not needed query variants
if qformat in queryDict:
queryDict.pop(qformat)
query = Query(
name=name,
query=queryText,
lang=self.lang,
**queryDict,
debug=self.debug,
)
queryDict["name"]=name
queryDict["lang"]=self.lang
if not "query" in queryDict:
queryDict["query"]=queryDict[self.lang]
query=Query.from_dict(queryDict)
query.debug=self.debug
self.queriesByName[name] = query

@staticmethod
def getQueries(queriesPath=None, with_default: bool = True):
def getQueries(self,queriesPath=None, with_default: bool = True):
"""
get the queries for the given queries Path
Expand Down
2 changes: 1 addition & 1 deletion lodstorage/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ class Version(object):
name = "pylodstorage"
version = lodstorage.__version__
date = "2020-09-10"
updated = "2024-09-11"
updated = "2024-09-13"
description = "python List of Dict (Table) Storage library"
35 changes: 35 additions & 0 deletions sampledata/wikidata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,38 @@
}
GROUP BY ?event ?eventLabel
ORDER BY DESC(?date)
'WikidataItemsNearItem':
# This query finds Wikidata items near a specified Wikidata item.
# It uses the Wikidata Query Service to find places within a given radius
# of the specified item and returns them ordered by distance.
param_list:
- name: item
type: WikidataItem
default_value: Q878253 # Schloss Weimar
- name: radius
type: float
default_value: 0.5 # 0.5 km
- name: limit
type: int
default_value: 50
sparql: |
#defaultView:Map
SELECT ?place ?placeLabel ?location ?dist
WHERE {
# coordinates of the item
wd:{{ item }} wdt:P625 ?itemLoc .
SERVICE wikibase:around {
?place wdt:P625 ?location .
bd:serviceParam wikibase:center ?itemLoc .
bd:serviceParam wikibase:radius "{{ radius }}" .
bd:serviceParam wikibase:distance ?dist.
}
# Optional: Filter for specific types of places
# FILTER EXISTS {
# ?place wdt:P31/wdt:P279* wd:Q41176 . # Instance of building or subclass of building
# }
FILTER(?place != wd:{{item}}) # Exclude item itself
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
ORDER BY ASC(?dist)
LIMIT {{ limit }}
60 changes: 43 additions & 17 deletions tests/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
from argparse import Namespace
from contextlib import redirect_stdout
import traceback

import tests.test_sqlite3
from lodstorage.query import (
Expand All @@ -31,6 +32,10 @@ class TestQueries(Basetest):
Test query handling
"""

def setUp(self, debug=False, profile=True):
Basetest.setUp(self, debug=debug, profile=profile)
self.wikidata_queries_path=f"{os.path.dirname(__file__)}/../sampledata/wikidata.yaml"

def testSQLQueries(self):
"""
see https://github.com/WolfgangFahl/pyLoDStorage/issues/19
Expand All @@ -47,30 +52,51 @@ def testSQLQueries(self):
print(resultDoc)
pass

def runQuery(self,query,show:bool=False):
if show:
print(f"{query.name}:{query}")
endpoint = SPARQL(query.endpoint)
try:
if query.params.has_params:
query.apply_default_params()
pass
qlod = endpoint.queryAsListOfDicts(query.query,param_dict=query.params.params_dict)
for tablefmt in ["mediawiki", "github", "latex"]:
doc = query.documentQueryResult(
qlod, tablefmt=tablefmt, floatfmt=".0f"
)
docstr = doc.asText()
if show:
print(docstr)

except Exception as ex:
print(f"{query.title} at {query.endpoint} failed: {ex}")
print(traceback.format_exc())


def testQueryWithParams(self):
"""
test SPARQL Query with parameters
"""
show = self.debug
show = True
qm = QueryManager( queriesPath=self.wikidata_queries_path,with_default=False,lang="sparql", debug=False)
query=qm.queriesByName["WikidataItemsNearItem"]
query.endpoint="https://query.wikidata.org/sparql"
self.assertIsInstance(query, Query)
self.runQuery(query,show=show)
pass

def testSparqlQueries(self):
"""
test SPARQL queries
"""
show = self.debug
show = True
#show = True
qm = QueryManager(lang="sparql", debug=False)
for name, query in qm.queriesByName.items():
if name in ["US President Nicknames"]:
if show:
print(f"{name}:{query}")
endpoint = SPARQL(query.endpoint)
try:
qlod = endpoint.queryAsListOfDicts(query.query)
for tablefmt in ["mediawiki", "github", "latex"]:
doc = query.documentQueryResult(
qlod, tablefmt=tablefmt, floatfmt=".0f"
)
docstr = doc.asText()
if show:
print(docstr)

except Exception as ex:
print(f"{query.title} at {query.endpoint} failed: {ex}")
self.runQuery(query,show=show)

def testUnicode2LatexWorkaround(self):
"""
Expand Down Expand Up @@ -478,7 +504,7 @@ def testIssue89(self):
"""
test fix TypeError('Object of type datetime is not JSON serializable') #89
"""
queriesPath = f"{os.path.dirname(__file__)}/../sampledata/wikidata.yaml"
queriesPath = self.wikidata_queries_path
args = [
"-qp",
f"{queriesPath}",
Expand Down

0 comments on commit 2261566

Please sign in to comment.