Skip to content

Commit

Permalink
Merge pull request #425 from mboudet/dev
Browse files Browse the repository at this point in the history
Manage file processing in background
  • Loading branch information
mboudet authored Nov 7, 2023
2 parents 4569fd5 + c985968 commit e443c6a
Show file tree
Hide file tree
Showing 19 changed files with 352 additions and 103 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ This changelog was started for release 4.2.0.
- "askomics:instancesLabel" predicate can be defined at the entity-level, to set a specific attribute URI as the 'label' (ie, visible by default)
- Play the same role as 'askomics:instancesHaveNoLabels', except a specific attribute is visible instead of the URI attribute.
- Added the *TIMEOUT* env variable, which will set the web workers tiemout value. Default 300s
- Added the 'skip_rdf_preview' config option. This will skip loading RDF files in memory to get the location and remote graph. Warning: This means you must enter the values at the integration step yourself.

### Changed

- Now pre-process files to store integration parameters (column, entities, etc...) in DB, to avoid re-processing every time.
- Rewrote the ontology part. Instead of specifying 'children of' and other values, users can tick the 'recursive' button to customize the query. While this is less intuitive, this change is more flexible for the various types of ontological relations

## [4.5.0] - 2023-10-20
Expand Down
24 changes: 20 additions & 4 deletions askomics/api/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ def upload_chunk():
}), 400

data = request.get_json()

skip_preview = data.get('skip_preview', False)
if not (data and all([key in data for key in ["first", "last", "size", "name", "type", "size", "chunk"]])):
return jsonify({
"path": '',
Expand All @@ -146,7 +148,7 @@ def upload_chunk():

try:
files = FilesHandler(current_app, session)
path = files.persist_chunk(data)
path = files.persist_chunk(data, skip_preview)
except Exception as e:
traceback.print_exc(file=sys.stdout)
return jsonify({
Expand Down Expand Up @@ -243,6 +245,9 @@ def get_preview():

results = []
for file in files_handler.files:
if file.status == "error":
continue

file.set_preview()
res = file.get_preview()
results.append(res)
Expand All @@ -254,11 +259,19 @@ def get_preview():
'errorMessage': str(e)
}), 500

errorMessage = ''
error = False
errorCode = 200
if not results:
errorMessage = "None of the selected files are in an integrable state"
error = True
errorCode = 400

return jsonify({
'previewFiles': results,
'error': False,
'errorMessage': ''
})
'error': error,
'errorMessage': errorMessage
}), errorCode


@file_bp.route('/api/files/delete', methods=['POST'])
Expand Down Expand Up @@ -332,6 +345,9 @@ def integrate():

for file in files_handler.files:

if file.status == "error":
continue

data["externalEndpoint"] = data["externalEndpoint"] if (data.get("externalEndpoint") and isinstance(file, RdfFile)) else None
data["externalGraph"] = data["externalGraph"] if (data.get("externalGraph") and isinstance(file, RdfFile)) else None
data["customUri"] = data["customUri"] if (data.get("customUri") and not isinstance(file, RdfFile)) else None
Expand Down
17 changes: 17 additions & 0 deletions askomics/libaskomics/BedFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ def __init__(self, app, session, file_info, host_url=None, external_endpoint=Non

def set_preview(self):
"""Set entity name preview"""

if self.preview:
self.entity_name = self.preview['entity_name']
return

try:
BedTool(self.path).count()
self.entity_name = self.human_name
Expand All @@ -49,6 +54,18 @@ def set_preview(self):
self.error_message = "Malformated BED ({})".format(str(e))
traceback.print_exc(file=sys.stdout)

def save_preview(self):
"""Save location and endpoint in preview"""
data = None
error = None
self.set_preview()

if self.error:
error = self.error_message
else:
data = {'entity_name': self.entity_name}
self.save_preview_in_db(data, error)

def get_preview(self):
"""Get file preview
Expand Down
6 changes: 6 additions & 0 deletions askomics/libaskomics/CsvFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ def set_preview(self):
self.set_preview_and_header()
self.set_columns_type()

def save_preview(self):
"""Save location and endpoint in preview"""
data = None
error = None
self.save_preview_in_db(data, error)

def get_preview(self):
"""Get a preview of the file
Expand Down
22 changes: 22 additions & 0 deletions askomics/libaskomics/Database.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,28 @@ def create_files_table(self):
except Exception:
pass

query = '''
ALTER TABLE files
ADD preview text NULL
DEFAULT(NULL)
'''

try:
self.execute_sql_query(query)
except Exception:
pass

query = '''
ALTER TABLE files
ADD preview_error text NULL
DEFAULT(NULL)
'''

try:
self.execute_sql_query(query)
except Exception:
pass

def create_abstraction_table(self):
"""Create abstraction table"""
query = """
Expand Down
20 changes: 20 additions & 0 deletions askomics/libaskomics/File.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import os
import json
import time
from dateutil import parser
from urllib.parse import quote
Expand Down Expand Up @@ -92,6 +93,8 @@ def __init__(self, app, session, file_info, host_url=None, external_endpoint=Non
self.path = file_info['path']
self.type = file_info['type']
self.size = file_info['size']
self.status = file_info['status']
self.preview = json.loads(file_info['preview']) if file_info['preview'] else None
self.id = file_info['id']
self.public = False
self.ntriples = 0
Expand Down Expand Up @@ -536,3 +539,20 @@ def convert_type(self, value, try_date=False):
return value

return value

def save_preview_in_db(self, preview, error):
database = Database(self.app, self.session)

status = "available" if not error else "error"
data = json.dumps(preview) if preview else None

query = '''
UPDATE files SET
preview = ?,
preview_error = ?,
status = ?
WHERE id= ?
'''

variables = [data, error, status, self.id]
database.execute_sql_query(query, tuple(variables))
50 changes: 34 additions & 16 deletions askomics/libaskomics/FilesHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def get_files_infos(self, files_id=None, files_path=None, return_path=False):
subquery_str = '(' + ' OR '.join(['id = ?'] * len(files_id)) + ')'

query = '''
SELECT id, name, type, size, path, date, status
SELECT id, name, type, size, path, date, status, preview, preview_error
FROM files
WHERE user_id = ?
AND {}
Expand All @@ -102,7 +102,7 @@ def get_files_infos(self, files_id=None, files_path=None, return_path=False):
subquery_str = '(' + ' OR '.join(['path = ?'] * len(files_path)) + ')'

query = '''
SELECT id, name, type, size, path, date, status
SELECT id, name, type, size, path, date, status, preview, preview_error
FROM files
WHERE user_id = ?
AND {}
Expand All @@ -113,7 +113,7 @@ def get_files_infos(self, files_id=None, files_path=None, return_path=False):
else:

query = '''
SELECT id, name, type, size, path, date, status
SELECT id, name, type, size, path, date, status, preview, preview_error
FROM files
WHERE user_id = ?
'''
Expand All @@ -128,7 +128,9 @@ def get_files_infos(self, files_id=None, files_path=None, return_path=False):
'type': row[2],
'size': row[3],
'date': row[5],
'status': row[6]
'status': row[6],
'preview': row[7],
'preview_error': row[8]
}
if return_path:
file['path'] = row[4]
Expand Down Expand Up @@ -206,7 +208,7 @@ def write_data_into_file(self, data, file_name, mode, should_exist=False):
with open(file_path, mode) as file:
file.write(data)

def store_file_info_in_db(self, name, filetype, file_name, size, status="available", task_id=None):
def store_file_info_in_db(self, name, filetype, file_name, size, status="available", task_id=None, skip_preview=False):
"""Store the file info in the database
Parameters
Expand Down Expand Up @@ -239,10 +241,15 @@ def store_file_info_in_db(self, name, filetype, file_name, size, status="availab
?,
?,
?,
?,
?,
?
)
'''

if not skip_preview:
status = 'processing'

# Type
if filetype in ('text/tab-separated-values', 'tabular'):
filetype = 'csv/tsv'
Expand All @@ -259,9 +266,13 @@ def store_file_info_in_db(self, name, filetype, file_name, size, status="availab

self.date = int(time.time())

return database.execute_sql_query(query, (self.session['user']['id'], name, filetype, file_path, size, self.date, status, task_id), get_id=True)
id = database.execute_sql_query(query, (self.session['user']['id'], name, filetype, file_path, size, self.date, status, task_id, None, None), get_id=True)

if not skip_preview:
self.app.celery.send_task('save_preview', ({"user": self.session["user"]}, id))
return id

def update_file_info(self, file_id, size=None, status="", task_id=""):
def update_file_info(self, file_id, size=None, status="", task_id="", error=None):
"""Update file size and status
Parameters
Expand All @@ -285,33 +296,39 @@ def update_file_info(self, file_id, size=None, status="", task_id=""):
size_query = ""
status_query = ""
task_query = ""
error_query = ""

# Should be a cleaner way of doing this...
if size is not None:
size_query = "size=?," if (status or task_id) else "size=?"
size_query = "size=?," if (status or task_id or error) else "size=?"
query_vars.append(size)

if status:
status_query = "status=?," if task_id else "status=?"
status_query = "status=?," if (task_id or error) else "status=?"
query_vars.append(status)

if task_id:
task_query = "task_id=?"
task_query = "task_id=?," if error else "task_id=?"
query_vars.append(task_id)

if error:
error_query = "preview_error=?"
query_vars.append(error)

query_vars.append(file_id)

query = '''
UPDATE files SET
{}
{}
{}
{}
WHERE id=?
'''.format(size_query, status_query, task_query)
'''.format(size_query, status_query, task_query, error_query)

database.execute_sql_query(query, tuple(query_vars))

def persist_chunk(self, chunk_info):
def persist_chunk(self, chunk_info, skip_preview=False):
"""Persist a file by chunk. Store info in db if the chunk is the last
Parameters
Expand All @@ -324,14 +341,15 @@ def persist_chunk(self, chunk_info):
str
local filename
"""

try:
# 1 chunk file
if chunk_info["first"] and chunk_info["last"]:
# Write data into file
file_name = self.get_file_name()
self.write_data_into_file(chunk_info["chunk"], file_name, "w")
# store file info in db
self.store_file_info_in_db(chunk_info["name"], chunk_info["type"], file_name, chunk_info["size"])
self.store_file_info_in_db(chunk_info["name"], chunk_info["type"], file_name, chunk_info["size"], skip_preview=skip_preview)
# first chunk of large file
elif chunk_info["first"]:
file_name = self.get_file_name()
Expand All @@ -340,7 +358,7 @@ def persist_chunk(self, chunk_info):
elif chunk_info["last"]:
file_name = chunk_info["path"]
self.write_data_into_file(chunk_info["chunk"], file_name, "a")
self.store_file_info_in_db(chunk_info["name"], chunk_info["type"], file_name, chunk_info["size"])
self.store_file_info_in_db(chunk_info["name"], chunk_info["type"], file_name, chunk_info["size"], skip_preview=skip_preview)
# chunk of large file
else:
file_name = chunk_info["path"]
Expand Down Expand Up @@ -395,8 +413,8 @@ def download_url(self, url, task_id):
# Update final value
self.update_file_info(file_id, size=os.path.getsize(path), status="available")

except Exception:
self.update_file_info(file_id, size=os.path.getsize(path), status="error")
except Exception as e:
self.update_file_info(file_id, size=os.path.getsize(path), status="error", error=str(e))

def get_type(self, file_ext):
"""Get files type, based on extension
Expand Down
17 changes: 17 additions & 0 deletions askomics/libaskomics/GffFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ def __init__(self, app, session, file_info, host_url=None, external_endpoint=Non

def set_preview(self):
"""Summary"""

if self.preview:
self.entities = self.preview['entities']
return

try:
exam = GFFExaminer()
handle = open(self.path, encoding="utf-8", errors="ignore")
Expand All @@ -67,6 +72,18 @@ def set_preview(self):
self.error_message = "Malformated GFF ({})".format(str(e))
traceback.print_exc(file=sys.stdout)

def save_preview(self):
"""Save location and endpoint in preview"""
data = None
error = None
self.set_preview()

if self.error:
error = self.error_message
else:
data = {'entities': self.entities}
self.save_preview_in_db(data, error)

def get_preview(self):
"""Get gff file preview (list of entities)
Expand Down
Loading

0 comments on commit e443c6a

Please sign in to comment.