-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
executable file
·223 lines (178 loc) · 5.98 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
from flask import Flask
from dotenv import load_dotenv
import os
import json
import csv
import re
import importlib.util
app = Flask(__name__)
load_dotenv()
# File fetching
def load_json(filename):
path = os.path.abspath(os.path.dirname(__file__)) + \
filename
f = open(path, 'r', encoding='utf-8-sig')
data = json.load(f)
f.close()
return data
def load_csv(filename, delimiter='|', quotechar='"'):
path = os.path.abspath(os.path.dirname(__file__)) + \
filename
reader = None
file = open(path, 'r', encoding='utf-8-sig')
reader = csv.reader(file, delimiter=delimiter, quotechar=quotechar)
return file, reader
def load_middlware(filename):
path = os.path.abspath(os.path.dirname(__file__)) + \
filename
# extract filename from /middleware/' + filename + '.py'
module_name = filename.replace('.py', '').replace('/middleware/', '.')
spec = importlib.util.spec_from_file_location(module_name, path)
module = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(module)
return module
except Exception as e:
return None
def cleanup(file):
file.close()
return file
# Templating
def parseMiddleware(key, value, middleware):
try:
func_name = key.title().replace(' ', '') # sign Up -> SignUp
middleware_function = getattr(middleware, func_name)
return middleware_function(value)
except Exception as e:
# app.logger.info(e)
# app.logger.info(func_name)
return value
def replace_variables(json_string, data, lookup, middleware):
# Lookup is a dictionary of column name to index
for key, value in lookup.items():
value = parseMiddleware(key, data[lookup[key]], middleware)
# check if value is an array
if type(value) is list:
value = ",".join(value)
# check if value is a dictionary
elif type(value) is dict:
value = json.dumps(value)
# check if value is a string
elif type(value) is str:
value = '"' + value + '"'
# check if value is a number
elif type(value) is int or type(value) is float:
value = value
# check if value is a boolean
elif type(value) is bool:
value = value
# check if value is None
elif value is None:
value = "" # these are stripped out in the end of convert()
else:
value = '"' + value + '"'
json_string = re.sub(r'\"{{ (.*' + key + '?) }}\"',
value, json_string)
return json_string
# https://towardsdatascience.com/flattening-json-objects-in-python-f5343c794b10
def flatten_json(y):
out = {}
def flatten(x, name=''):
if type(x) is dict:
for a in x:
flatten(x[a], name + a + '.')
elif type(x) is list:
i = 0
for a in x:
flatten(a, name + str(i) + '.')
i += 1
else:
out[name[:-1]] = x
flatten(y)
return out
# Relevant columns
def getColNames(reader):
column_names = []
for row in reader:
for column in row:
column_names.append(column)
break
return column_names
def getColumnIndexes(reader, data_template):
# Used for replacing variables in json template
# by providing an index to the relevant column
# ie row[lookup[Title]]
column_names = getColNames(reader)
relevantColumns = {}
for value in data_template.values():
# Check if value is a string and if it contains "{{"
if type(value) is str and "{{" in value:
value = value.replace("{{ ", '').replace(" }}", '')
relevantColumns[value] = column_names.index(value)
else:
continue
return relevantColumns
def convert(
migration_source_pathname,
schema_pathname,
middleware_pathname,
delimiter='|',
quotechar='"'):
schema = load_json(schema_pathname)
file, reader = load_csv(migration_source_pathname, delimiter, quotechar)
middleware = load_middlware(middleware_pathname)
# Generate
data_template = flatten_json(schema)
relevantColumns = getColumnIndexes(reader, data_template)
# Generate entries from csv
entries = []
for row in reader:
entry_string = replace_variables(
json.dumps(schema),
row,
relevantColumns,
middleware
)
# clean up any empty key value pairs
entry_string = re.sub(r'(, "\w+": "")|("\w+": "",)',
'', entry_string)
entry = json.loads(entry_string)
entries.append(entry)
cleanup(file)
# Make filename
filename = file.name.replace(
'migration-source', 'migrated').rsplit('.', maxsplit=1)[0] + '.json'
with open(filename, 'w', encoding='utf-8-sig') as outfile:
json.dump(entries, outfile)
def convertFiles(filenames, delimiter='|', quotechar='"'):
# generate source files from filenames at /migration-source/xxx.csv
for filename in filenames:
convert(
'/migration-source/' + filename + '.csv',
'/schemas/' + filename + '.json',
'/middleware/' + filename + '.py',
delimiter=",",
quotechar='"'
)
return "success"
# Flask Routes
@app.route('/translate')
def translate():
filenames = [
# 'sample',
# 'dealers',
# 'orders',
'products',
# 'reviews',
]
return convertFiles(filenames)
@app.after_request
def after_request(response):
response.headers.add('Access-Control-Allow-Headers',
'Content-Type, Authorization')
response.headers.add('Access-Control-Allow-Methods',
'GET, POST, PATCH, DELETE, OPTIONS')
response.headers.add('Access-Control-Allow-Origin', '*')
return response
if __name__ == '__main__':
app.run(debug=True, use_debugger=True, use_reloader=True)