-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvrt2rdf.py
147 lines (121 loc) · 4.83 KB
/
vrt2rdf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import os
import logging
import argparse
from datetime import datetime
from jinja2 import Template
from osgeo import ogr
logger = logging.getLogger()
rdf_template = "templates/territoires.rdf.j2"
rdf_out_file = "territoire.rdf"
class SkosFeature():
def __init__(self, ogr_feature):
self.feature = ogr_feature
self.id = self.get_field_or_null('id')
self.name = self.get_field_or_null('name')
self.description = self.get_field_or_null('description')
self.admin_type = self.get_field_or_null('admin_type')
self.skos_prefix = self.get_field_or_null('skos_prefix')
self.narrower_prefix = self.get_field_or_null('narrower_prefix')
self.narrower_ids = self.get_field_or_null('narrower_ids')
self.broader_prefix = self.get_field_or_null('broader_prefix')
self.broader_ids = self.get_field_or_null('broader_ids')
try:
(minX, maxX, minY, maxY) = ogr_feature.GetGeometryRef().GetEnvelope()
self.minX = minX
self.maxX = maxX
self.minY = minY
self.maxY = maxY
except:
pass
def get_field_or_null(self, fid):
try:
return self.feature[fid]
except:
return None
def get_broader_ids_as_list(self):
if self.broader_ids:
return self.broader_ids.split(',')
else:
return []
def get_narrower_ids_as_list(self):
if self.narrower_ids:
return self.narrower_ids.split(',')
else:
return []
def main():
# Input arguments
parser = argparse.ArgumentParser(description='''
Reads the input VRT, iterates through the layers and produces a RDF thesaurus suitable for GeoNetwork as a
geographic thesaurus. Using VRT as input, it allows you a lot of flexibility in the data sources you want to use,
the VRT providing an abstraction layer between your data sources and this script.
''')
parser.add_argument('vrtfile', metavar='VRT file path', help='an integer for the accumulator')
parser.add_argument('-o', '--out_file',
help='Output file name. Default: name of the template, without the jinja extension')
parser.add_argument('-v', '--verbose', help='verbose output (debug loglevel)',
action='store_true')
parser.add_argument('--logfile',
help='logfile path. Default: prints logs to the console')
parser.add_argument('-t', '--template',
help='template file path. Default: templates/territoires.rdf.j2')
args = parser.parse_args()
# INITIALIZE LOGGER
handler = logging.StreamHandler()
if args.logfile:
handler = logging.FileHandler(args.logfile)
formatter = logging.Formatter(
'%(asctime)s %(name)-5s %(levelname)-3s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
loglevel = logging.INFO
if args.verbose:
loglevel = logging.DEBUG
logger.setLevel(loglevel)
# Initialize global vars
if args.template:
global rdf_template
rdf_template = args.template
global rdf_out_file
if args.out_file:
rdf_out_file = args.out_file
else:
# extract the output file name from the template file (removing the path and the jinja extension)
rdf_out_file = os.path.splitext(os.path.basename(rdf_template))[0]
vrt2rdf(args.vrtfile)
def vrt2rdf(filename):
features = collect_features(filename)
skos_xml = features_to_skos_xml(features)
logger.debug(skos_xml)
if skos_xml:
logger.info("Writing RDF data to {}".format(rdf_out_file))
with open(rdf_out_file, 'w') as f:
f.write(skos_xml)
def features_to_skos_xml(features):
logger.info("Using template file {}".format(rdf_template))
with open(rdf_template) as file_:
template = Template(file_.read())
rdf_xml=template.render(date=datetime.now(), feats=features)
return rdf_xml
def get_features_from_layer(layer):
logger.info('Processing layer {}'.format(layer.GetName()))
features_list = []
layer_defn = layer.GetLayerDefn()
fieldnames = [layer_defn.GetFieldDefn(i).GetName() for i in range(layer_defn.GetFieldCount())]
layer.ResetReading()
for feature in layer:
f = SkosFeature(feature)
features_list.append(f)
logger.info("Collected {} features".format(len(features_list)))
return features_list
def collect_features(filename):
inDataSource = ogr.Open(filename)
features_list = []
for layer_id in range(inDataSource.GetLayerCount()):
# for layer_id in range(2):
layer = inDataSource.GetLayerByIndex(layer_id)
feats = get_features_from_layer(layer)
if feats:
features_list = [*features_list, *feats]
return features_list
if __name__ == '__main__':
main()