Skip to content

Commit

Permalink
Added option to standardize (or not) and to add graph specific labels…
Browse files Browse the repository at this point in the history
… (or not) for conversion to RDF
  • Loading branch information
IKCAP committed May 15, 2024
1 parent d9dcee0 commit 63ee972
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 22 deletions.
8 changes: 4 additions & 4 deletions pylipd/lipd.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def load_from_dir(self, dir_path, parallel=False, cutoff=None):


# Allows loading http locations
def load(self, lipdfiles, parallel=False):
def load(self, lipdfiles, parallel=False, standardize=True, add_labels=True):
'''Load LiPD files.
Expand Down Expand Up @@ -137,14 +137,14 @@ def load(self, lipdfiles, parallel=False):

numfiles = len(lipdfiles)
print(f"Loading {numfiles} LiPD files")
self.graph = multi_load_lipd(self.graph, lipdfiles, parallel)
self.graph = multi_load_lipd(self.graph, lipdfiles, parallel, standardize, add_labels)
print("Loaded..")

#def load_from_lipdverse(self, datasetID, version=None):



def convert_lipd_dir_to_rdf(self, lipd_dir, rdf_file, parallel=False):
def convert_lipd_dir_to_rdf(self, lipd_dir, rdf_file, parallel=False, standardize=True, add_labels=False):
'''Convert a directory containing LiPD files into a single RDF file (to be used for uploading to Knowledge Bases like GraphDB)
Parameters
Expand All @@ -166,7 +166,7 @@ def convert_lipd_dir_to_rdf(self, lipd_dir, rdf_file, parallel=False):

print(f"Converting {len(filemap.keys())} LiPD files to RDF..")

multi_convert_to_rdf(filemap, parallel)
multi_convert_to_rdf(filemap, parallel, standardize, add_labels)

print("Conversion to RDF done..")

Expand Down
4 changes: 3 additions & 1 deletion pylipd/usage_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
L.convert_lipd_dir_to_rdf(
local_lipd_dir,
local_lipd_dir+".nq",
parallel=True)
parallel=True,
standardize=True,
add_labels=False)
exit()


Expand Down
18 changes: 14 additions & 4 deletions pylipd/utils/lipd_to_rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ class LipdToRDF:
It uses the SCHEMA dictionary (from globals/schema.py) to do the conversion
"""

def __init__(self):
def __init__(self, standardize=True, add_labels=True):
self.graph = ConjunctiveGraph()
self.lipd_csvs = {}
self.graphurl = NSURL
self.namespace = NSURL + "/"
self.standardize = standardize
self.add_labels = add_labels
self.schema = expand_schema(copy.deepcopy(SCHEMA))


Expand Down Expand Up @@ -465,6 +467,11 @@ def _add_standard_variable(self, obj, objhash) :
synonyms = SYNONYMS["VARIABLES"]["PaleoVariable"]
if type(name) is str and name.lower() in synonyms:
obj["hasStandardVariable"] = synonyms[name.lower()]["id"]
# Only add object label in the current graph if set
if self.add_labels:
label = synonyms[name.lower()]["label"]
self._set_object_label(obj["hasStandardVariable"], label)

return [obj, objhash, []]

def _stringify_column_numbers_array(self, obj, objhash):
Expand Down Expand Up @@ -883,12 +890,15 @@ def _create_individual_full(self, obj) :

# Set property value
if dtype == "Individual":
if type(value) is str and value.lower() in synonyms:
if self.standardize and type(value) is str and value.lower() in synonyms:
# Only standardize if set to standardize
propDI[1] = "EnumeratedIndividual" # Rename property type to be an enumeration
synid = synonyms[value.lower()]["id"]
label = synonyms[value.lower()]["label"]
self._set_property_value(objid, propDI, synid)
self._set_object_label(synid, label)
# Only add object label in the current graph if set
if self.add_labels:
label = synonyms[value.lower()]["label"]
self._set_object_label(synid, label)
else:
self._set_property_value(objid, propDI, value)
elif type(value) is dict:
Expand Down
27 changes: 14 additions & 13 deletions pylipd/utils/multi_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
from ..globals.queries import QUERY_ALL_VARIABLES_GRAPH
from .lipd_to_rdf import LipdToRDF
import multiprocessing as mp
import copy

def convert_to_rdf(files):
(lipdfile, rdffile) = files
converter = LipdToRDF()
(lipdfile, rdffile, standardize, add_labels) = files
converter = LipdToRDF(standardize, add_labels)
"""Worker that converts one lipdfile to an rdffile"""
try:
converter.convert(lipdfile)
Expand All @@ -16,44 +15,46 @@ def convert_to_rdf(files):
raise e


def multi_convert_to_rdf(filemap, parallel=True):
def multi_convert_to_rdf(filemap, parallel=True, standardize=True, add_labels=True):
if parallel:
"""Create a pool to convert all lipdfiles to rdffiles"""
args = [(lipdfile, rdffile) for lipdfile, rdffile in filemap.items()]
args = [(lipdfile, rdffile, standardize, add_labels) for lipdfile, rdffile in filemap.items()]
pool = mp.Pool(mp.cpu_count())
for file in tqdm(pool.imap_unordered(convert_to_rdf, args, chunksize=1), total=len(args)):
pass
pool.close()
else:
for lipdfile, rdffile in filemap.items():
convert_to_rdf((lipdfile, rdffile))
convert_to_rdf((lipdfile, rdffile, standardize, add_labels))


def convert_lipd_to_graph(lipdfile):
def convert_lipd_to_graph(arg):
(lipdfile, standardize, add_labels) = arg
"""Worker that converts one lipdfile to an RDF graph"""
try:
converter = LipdToRDF()
converter = LipdToRDF(standardize, add_labels)
converter.convert(lipdfile)
return converter.graph
except Exception as e:
print(f"ERROR: Could not convert LiPD file {lipdfile} to RDF")
raise e


def multi_load_lipd(graph, lipdfiles, parallel=True):
def multi_load_lipd(graph, lipdfiles, parallel=True, standardize=True, add_labels=True):
"""Load all lipdfiles to the RDF graph"""
args = [(file, standardize, add_labels) for file in lipdfiles]
if parallel:
with mp.Pool(mp.cpu_count()) as pool:
for subgraph in tqdm(pool.imap_unordered(convert_lipd_to_graph, lipdfiles, chunksize=1), total=len(lipdfiles)):
for subgraph in tqdm(pool.imap_unordered(convert_lipd_to_graph, args, chunksize=1), total=len(lipdfiles)):
graph.addN(subgraph.quads())
del subgraph
pool.close()
pool.join()

else:
for i in tqdm(range(0, len(lipdfiles))):
lipdfile = lipdfiles[i]
subgraph = convert_lipd_to_graph(lipdfile)
for i in tqdm(range(0, len(args))):
arg = args[i]
subgraph = convert_lipd_to_graph(arg)
graph.addN(subgraph.quads())
del subgraph
return graph
Expand Down

0 comments on commit 63ee972

Please sign in to comment.